#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
char *osd_base_path = "./osddata";
char *ebofs_base_path = "./ebofsdev";
-#define ROLE_TYPE(x) ((x)>0 ? 1:(x))
-
if (g_conf.ebofs) {
store = new Ebofs(dev_path);
+ //store->_fake_writes(true);
}
#ifdef USE_OBFS
else if (g_conf.uofs) {
osd_logtype.add_inc("c_wr");
osd_logtype.add_inc("c_wrb");
- osd_logtype.add_inc("r_pull");
- osd_logtype.add_inc("r_pullb");
+ osd_logtype.add_inc("r_push");
+ osd_logtype.add_inc("r_pushb");
osd_logtype.add_inc("r_wr");
osd_logtype.add_inc("r_wrb");
}
osd_lock.Unlock();
+ dout(0) << "osd_rep " << g_conf.osd_rep << endl;
+
return 0;
}
if (pg_lock.count(pgid)) {
Cond c;
dout(15) << "lock_pg " << hex << pgid << dec << " waiting as " << &c << endl;
+ //cerr << "lock_pg " << hex << pgid << dec << " waiting as " << &c << endl;
list<Cond*>& ls = pg_lock_waiters[pgid]; // this is commit, right?
ls.push_back(&c);
-void OSD::handle_op_reply(MOSDOpReply *m)
-{
- if (m->get_map_epoch() < boot_epoch) {
- dout(3) << "replica op reply from before boot" << endl;
- delete m;
- return;
- }
-
-
- // handle op
- switch (m->get_op()) {
- case OSD_OP_REP_PULL:
- op_rep_pull_reply(m);
- break;
-
- case OSD_OP_REP_WRNOOP:
- case OSD_OP_REP_WRITE:
- case OSD_OP_REP_TRUNCATE:
- case OSD_OP_REP_DELETE:
- case OSD_OP_REP_WRLOCK:
- case OSD_OP_REP_WRUNLOCK:
- case OSD_OP_REP_RDLOCK:
- case OSD_OP_REP_RDUNLOCK:
- case OSD_OP_REP_UPLOCK:
- case OSD_OP_REP_DNLOCK:
- {
- const pg_t pgid = m->get_pg();
- if (pg_map.count(pgid)) {
- PG *pg = _lock_pg(pgid);
- assert(pg);
- handle_rep_op_ack(pg, m->get_tid(), m->get_result(), m->get_commit(), MSG_ADDR_NUM(m->get_source()),
- m->get_pg_complete_thru());
- _unlock_pg(pgid);
- } else {
- // pg dne! whatev.
- }
- delete m;
- }
- break;
-
- default:
- assert(0);
- }
-}
-
-/*
- * NOTE: called holding pg lock /////osd_lock, opqueue active.
- */
-void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit,
- int fromosd, eversion_t pg_complete_thru)
-{
- if (!pg->replica_ops.count(tid)) {
- dout(7) << "not waiting for repop reply tid " << tid << " in " << *pg
- << ", map must have changed, dropping." << endl;
- return;
- }
-
- OSDReplicaOp *repop = pg->replica_ops[tid];
- MOSDOp *op = repop->op;
-
- dout(7) << "handle_rep_op_ack " << tid << " op " << op
- << " result " << result << " commit " << commit << " from osd" << fromosd
- << " in " << *pg
- << endl;
-
- /*
- * for now, we take a lazy approach to handling replica set changes
- * that overlap with writes. replicas with newer maps will reply with
- * result == -1, but we treat them as a success, and ack the write to
- * the client. this means somewhat weakened safety semantics for the client
- * write, but is much simpler on the osd end. and no weaker than the rest of the
- * data in the PG.. or this same write, if it had completed just before the failure.
- *
- * meanwhile, the regular recovery process will handle the object version
- * mismatch.. the new primary (and others) will pull the latest from the old
- * primary. because of the PGLog stuff, it'll be pretty efficient, aside from
- * the fact that the entire object is copied.
- *
- * one optimization: if the rep_write is received by the new primary, they can
- * (at their discretion) apply it and remove the object from their missing list...
- * or: if a replica sees tha the old primary is not down, it might assume that its
- * state will be recovered (ie the new version) and apply the write.
- */
- if (1) { //if (result >= 0) {
- // success
- get_repop(repop);
- {
- if (commit) {
- // commit
- assert(repop->waitfor_commit.count(tid));
- repop->waitfor_commit.erase(tid);
- repop->waitfor_ack.erase(tid);
-
- repop->pg_complete_thru[fromosd] = pg_complete_thru;
-
- pg->replica_ops.erase(tid);
- pg->replica_tids_by_osd[fromosd].erase(tid);
- if (pg->replica_tids_by_osd[fromosd].empty()) pg->replica_tids_by_osd.erase(fromosd);
- } else {
- // ack
- repop->waitfor_ack.erase(tid);
- }
- }
- put_repop(repop);
- }
-
-}
-
-
void OSD::handle_osd_ping(MOSDPing *m)
{
for (map<int,entity_inst_t>::iterator i = inc.new_down.begin();
i != inc.new_down.end();
i++) {
- if (i->first == whoami) continue;
- messenger->mark_down(MSG_ADDR_OSD(i->first), i->second);
- peer_map_epoch.erase(MSG_ADDR_OSD(i->first));
+ int osd = i->first;
+ if (osd == whoami) continue;
+ messenger->mark_down(MSG_ADDR_OSD(osd), i->second);
+ peer_map_epoch.erase(MSG_ADDR_OSD(osd));
+
+ // kick any replica ops
+ for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
+ it != pg_map.end();
+ it++) {
+ PG *pg = it->second;
+ list<PG::RepOpGather*> ls; // do async; repop_ack() may modify pg->repop_gather
+ for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
+ p != pg->repop_gather.end();
+ p++) {
+ if (p->second->waitfor_ack.count(osd) ||
+ p->second->waitfor_commit.count(osd))
+ ls.push_back(p->second);
+ }
+ for (list<PG::RepOpGather*>::iterator p = ls.begin();
+ p != ls.end();
+ p++)
+ repop_ack(pg, *p, -1, true, osd);
+ }
}
for (map<int,entity_inst_t>::iterator i = inc.new_up.begin();
i != inc.new_up.end();
<< endl;
if (osdmap->is_mkfs()) {
- dout(1) << "mkfs" << endl;
+ ps_t maxps = 1ULL << osdmap->get_pg_bits();
+ dout(1) << "mkfs on " << osdmap->get_pg_bits() << " bits, " << maxps << " pgs" << endl;
assert(osdmap->get_epoch() == 1);
//cerr << "osdmap " << osdmap->get_ctime() << " logger start " << logger->get_start() << endl;
logger->set_start( osdmap->get_ctime() );
- ps_t maxps = 1LL << osdmap->get_pg_bits();
-
// create PGs
for (int nrep = 1;
nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep); // for low osd counts.. hackish bleh
nrep++) {
for (pg_t ps = 0; ps < maxps; ps++) {
pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
- int role = osdmap->get_pg_acting_role(pgid, whoami);
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
if (role < 0) continue;
PG *pg = create_pg(pgid, t);
- osdmap->pg_to_acting_osds(pgid, pg->acting);
pg->set_role(role);
+ pg->acting.swap(acting);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
pg->info.same_primary_since =
+ pg->info.same_acker_since =
pg->info.same_role_since = osdmap->get_epoch();
pg->activate(t);
// local PG too
pg_t pgid = osdmap->osd_nrep_to_pg(whoami, nrep);
- int role = osdmap->get_pg_acting_role(pgid, whoami);
- if (role < 0) continue;
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
PG *pg = create_pg(pgid, t);
- osdmap->pg_to_acting_osds(pgid, pg->acting);
+ pg->acting.swap(acting);
pg->set_role(role);
pg->last_epoch_started_any =
pg->info.last_epoch_started =
pg->info.same_primary_since =
+ pg->info.same_acker_since =
pg->info.same_role_since = osdmap->get_epoch();
pg->activate(t);
// get new acting set
vector<int> acting;
int nrep = osdmap->pg_to_acting_osds(pgid, acting);
-
- int primary = -1;
- if (nrep > 0) primary = acting[0];
-
- int role = -1; // -1, 0, 1
- for (int i=0; i<nrep; i++)
- if (acting[i] == whoami) role = i > 0 ? 1:0;
-
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
// no change?
if (acting == pg->acting)
continue;
+
+ int primary = -1;
+ if (nrep > 0) primary = acting[0];
- // primary changed?
int oldrole = pg->get_role();
int oldprimary = pg->get_primary();
+ int oldacker = pg->get_acker();
vector<int> oldacting = pg->acting;
// update PG
- pg->acting = acting;
- pg->calc_role(whoami);
+ pg->acting.swap(acting);
+ pg->set_role(role);
- // did primary change?
+ // did primary|acker change?
if (oldprimary != primary) {
pg->info.same_primary_since = osdmap->get_epoch();
pg->cancel_recovery();
}
-
+ if (oldacker != pg->get_acker()) {
+ pg->info.same_acker_since = osdmap->get_epoch();
+ }
+ if (oldprimary != primary || oldacker != pg->get_acker()) {
+ // drop our write-ahead log. (we'll only have on if we were just the acker)
+ pg->trim_write_ahead();
+ }
+
if (role != oldrole) {
pg->info.same_role_since = osdmap->get_epoch();
dout(10) << *pg << " " << oldacting << " -> " << acting
<< ", replicas changed" << endl;
+ // completely restart peering process.
+ pg->clear_primary_state();
+
+ /* this is compliated, deal with it later.
// clear peer_info for (re-)new replicas
for (unsigned i=1; i<acting.size(); i++) {
bool had = false;
for (unsigned j=1; j<oldacting.size(); j++)
- if (acting[i] == oldacting[j]) {
+ if (pg->acting[i] == oldacting[j]) {
had = true;
break;
}
if (!had) {
- dout(10) << *pg << " hosing any peer state for new replica osd" << acting[i] << endl;
- pg->peer_info.erase(acting[i]);
- pg->peer_info_requested.erase(acting[i]);
- pg->peer_missing.erase(acting[i]);
- pg->peer_log_requested.erase(acting[i]);
- pg->peer_summary_requested.erase(acting[i]);
+ dout(10) << *pg << " hosing any peer state for new replica osd" << pg->acting[i] << endl;
+ pg->peer_info.erase(pg->acting[i]);
+ pg->peer_info_requested.erase(pg->acting[i]);
+ pg->peer_missing.erase(pg->acting[i]);
+ pg->peer_log_requested.erase(pg->acting[i]);
+ pg->peer_summary_requested.erase(pg->acting[i]);
}
}
+ */
}
}
}
- // scan (FIXME newly!) down osds
- for (set<int>::const_iterator down = osdmap->get_down_osds().begin();
- down != osdmap->get_down_osds().end();
- down++) {
- if (*down == whoami) continue;
-
- // old peer?
- bool have = false;
- for (unsigned i=0; i<oldacting.size(); i++)
- if (oldacting[i] == *down) have = true;
- if (!have) continue;
-
- dout(10) << *pg << " old peer osd" << *down << " is down" << endl;
-
- // NAK any ops to the down osd
- if (pg->replica_tids_by_osd.count(*down)) {
- set<__uint64_t> s = pg->replica_tids_by_osd[*down];
- dout(10) << " " << *pg << " naking replica ops to down osd" << *down << " " << s << endl;
- for (set<__uint64_t>::iterator tid = s.begin();
- tid != s.end();
- tid++)
- handle_rep_op_ack(pg, *tid, -1, true, *down);
- }
- }
}
}
}
pg->read_log(store);
// generate state for current mapping
- //int nrep =
- osdmap->pg_to_acting_osds(pgid, pg->acting);
- int role = -1;
- for (unsigned i=0; i<pg->acting.size(); i++)
- if (pg->acting[i] == whoami) role = i>0 ? 1:0;
+ int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
+ int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
pg->set_role(role);
dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
if (pg_map.count(pgid) == 0) {
// check mapping.
vector<int> acting;
- osdmap->pg_to_acting_osds(pgid, acting);
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ if (!nrep) {
+ dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " has null mapping" << endl;
+ continue;
+ }
// am i still the primary?
assert(it->same_primary_since <= osdmap->get_epoch());
// ok, create PG!
pg = create_pg(pgid, t);
- pg->acting = acting;
- pg->info.same_primary_since = it->same_primary_since;
+ pg->acting.swap( acting );
pg->set_role(0);
+ pg->info.same_primary_since = it->same_primary_since;
+ pg->info.same_acker_since = it->same_acker_since;
pg->info.same_role_since = osdmap->get_epoch();
pg->last_epoch_started_any = it->last_epoch_started;
// ok active!
pg->info.same_primary_since = m->info.same_primary_since;
+ pg->info.same_acker_since = m->info.same_acker_since;
pg->activate(t);
}
if (pg_map.count(pgid) == 0) {
// get active rush mapping
vector<int> acting;
- //int nrep =
- osdmap->pg_to_acting_osds(pgid, acting);
- //assert(nrep > 0);
- int role = -1;
- for (unsigned i=0; i<acting.size(); i++)
- if (acting[i] == whoami) role = i>0 ? 1:0;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
if (role == 0) {
dout(10) << " pg " << hex << pgid << dec << " dne, and i am primary. just waiting for notify." << endl;
ObjectStore::Transaction t;
PG *pg = create_pg(pgid, t);
- pg->acting = acting;
+ pg->acting.swap( acting );
pg->set_role(role);
- pg->info.same_primary_since = it->second.same_primary_since; //calc_pg_primary_since(acting[0], pgid, m->get_epoch());
+ pg->info.same_primary_since = it->second.same_primary_since;
+ pg->info.same_acker_since = it->second.same_acker_since;
pg->info.same_role_since = osdmap->get_epoch();
t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info));
+/*** RECOVERY ***/
-// RECOVERY
-
-
-
-
-
-// pull
-
-
+/** pull - request object from a peer
+ */
void OSD::pull(PG *pg, object_t oid, eversion_t v)
{
assert(pg->missing.loc.count(oid));
MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
oid, pg->get_pgid(),
osdmap->get_epoch(),
- OSD_OP_REP_PULL);
+ OSD_OP_PULL);
op->set_version(v);
- op->set_pg_role(-1); // whatever, not 0
messenger->send_message(op, MSG_ADDR_OSD(osd));
// take note
}
-/** op_rep_pull
- * process request to pull an entire object.
- * NOTE: called from opqueue.
+/** push - send object to a peer
*/
-void OSD::op_rep_pull(MOSDOp *op, PG *pg)
+void OSD::push(PG *pg, object_t oid, int dest)
{
- const object_t oid = op->get_oid();
-
- dout(7) << "rep_pull on " << hex << oid << dec << " v >= " << op->get_version() << endl;
-
// read data+attrs
bufferlist bl;
eversion_t v;
int vlen = sizeof(v);
map<string,bufferptr> attrset;
-
+
ObjectStore::Transaction t;
t.read(oid, 0, 0, &bl);
t.getattr(oid, "version", &v, &vlen);
t.getattrs(oid, attrset);
unsigned tr = store->apply_transaction(t);
-
- if (tr != 0) {
- // reply with -EEXIST
- dout(7) << "rep_pull don't have " << hex << oid << dec << endl;
- MOSDOpReply *reply = new MOSDOpReply(op, -EEXIST, osdmap->get_epoch(), true);
- messenger->send_message(reply, op->get_asker());
- delete op;
- return;
- }
- dout(7) << "rep_pull has "
- << hex << op->get_oid() << dec
- << " v " << v << " >= " << op->get_version()
+ assert(tr == 0); // !!!
+
+ // ok
+ dout(7) << *pg << " push " << hex << oid << dec << " v " << v
<< " size " << bl.length()
- << " in " << *pg
+ << " to osd" << dest
<< endl;
- assert(v >= op->get_version());
- logger->inc("r_pull");
- logger->inc("r_pullb", bl.length());
+ logger->inc("r_push");
+ logger->inc("r_pushb", bl.length());
- // reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true);
- reply->set_result(0);
- reply->set_length(bl.length());
- reply->set_data(bl); // note: claims bl, set length above here!
- reply->set_offset(0);
- reply->set_version(v);
- reply->set_attrset(attrset);
-
- messenger->send_message(reply, op->get_asker());
+ // send
+ MOSDOp *op = new MOSDOp(++last_tid, MSG_ADDR_OSD(whoami),
+ oid, pg->info.pgid, osdmap->get_epoch(),
+ OSD_OP_PUSH);
+ op->set_offset(0);
+ op->set_length(bl.length());
+ op->set_data(bl); // note: claims bl, set length above here!
+ op->set_version(v);
+ op->set_attrset(attrset);
- delete op;
+ messenger->send_message(op, MSG_ADDR_OSD(dest));
}
-/*
- * NOTE: called holding osd_lock. opqueue active.
+/** op_pull
+ * process request to pull an entire object.
+ * NOTE: called from opqueue.
*/
-void OSD::op_rep_pull_reply(MOSDOpReply *op)
+void OSD::op_pull(MOSDOp *op, PG *pg)
{
- object_t oid = op->get_oid();
- eversion_t v = op->get_version();
- pg_t pgid = op->get_pg();
+ const object_t oid = op->get_oid();
+ const eversion_t v = op->get_version();
+ int from = op->get_source().num();
- if (pg_map.count(pgid) == 0) {
- dout(7) << "rep_pull_reply on pg " << hex << pgid << dec << ", dne" << endl;
+ dout(7) << "op_pull " << hex << oid << dec << " v " << op->get_version()
+ << " from " << op->get_source()
+ << endl;
+
+ // am i missing it?
+ if (waitfor_missing_object(op, pg)) {
+ // ok, i better the primary... (or else there's a map mismatch, or the primary is wrong about my objects, or?)
+ assert(pg->is_primary());
return;
}
- PG *pg = _lock_pg(pgid);
+ // is a replica asking? are they missing it?
+ if (pg->is_primary() &&
+ (pg->peer_missing.count(from) == 0 ||
+ !pg->peer_missing[from].is_missing(oid))) {
+ dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl;
+ delete op;
+ return;
+ }
+
+ // push it back!
+ push(pg, oid, op->get_source().num());
+}
- if (!pg->objects_pulling.count(oid)) {
- dout(7) << "rep_pull_reply on object " << hex << oid << dec << ", not pulling" << endl;
- _unlock_pg(pgid);
+
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void OSD::op_push(MOSDOp *op, PG *pg)
+{
+ object_t oid = op->get_oid();
+ eversion_t v = op->get_version();
+
+ if (!pg->missing.is_missing(oid)) {
+ dout(7) << "op_push not missing object " << hex << oid << dec << endl;
return;
}
- dout(7) << "rep_pull_reply "
+ dout(7) << "op_push "
<< hex << oid << dec
<< " v " << v
<< " size " << op->get_length() << " " << op->get_data().length()
<< endl;
assert(op->get_data().length() == op->get_length());
-
+
// write object and add it to the PG
ObjectStore::Transaction t;
t.remove(oid); // in case old version exists
t.write(oid, 0, op->get_length(), op->get_data());
t.setattrs(oid, op->get_attrset());
- t.collection_add(pgid, oid);
-
- // close out pull op.
- num_pulling--;
- pg->objects_pulling.erase(oid);
- pg->missing.got(oid, v);
-
- // kick waiters
- if (pg->waiting_for_missing_object.count(oid))
- take_waiters(pg->waiting_for_missing_object[oid]);
+ t.collection_add(pg->info.pgid, oid);
// raise last_complete?
assert(pg->log.complete_to != pg->log.log.end());
}
dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
- // continue
- pg->do_recovery();
-
// apply to disk!
- t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+ t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info));
unsigned r = store->apply_transaction(t);
assert(r == 0);
- _unlock_pg(pgid);
+
+ // close out pull op?
+ num_pulling--;
+ if (pg->objects_pulling.count(oid))
+ pg->objects_pulling.erase(oid);
+ pg->missing.got(oid, v);
+
+ // am i primary? are others missing this too?
+ if (pg->is_primary()) {
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ int peer = pg->acting[i];
+ if (pg->peer_missing.count(peer) &&
+ pg->peer_missing[peer].is_missing(oid)) {
+ // ok, push it, and they (will) have it now.
+ pg->peer_missing[peer].got(oid, v);
+ push(pg, oid, peer);
+ }
+ }
+
+ // continue recovery
+ pg->do_recovery();
+ }
+
+ // kick waiters
+ if (pg->waiting_for_missing_object.count(oid))
+ take_waiters(pg->waiting_for_missing_object[oid]);
delete op;
}
public:
OSD *osd;
MOSDOp *op;
+ int destosd;
eversion_t pg_last_complete;
bool acked;
bool waiting;
- C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, eversion_t lc) : osd(o), op(oo), pg_last_complete(lc),
- acked(false), waiting(false) { }
+ C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) :
+ osd(o), op(oo), destosd(dosd), pg_last_complete(lc),
+ acked(false), waiting(false) { }
void finish(int r) {
lock.Lock();
while (!acked) {
}
assert(acked);
lock.Unlock();
- osd->op_rep_modify_commit(op, pg_last_complete);
+ osd->op_rep_modify_commit(op, destosd, pg_last_complete);
}
void ack() {
lock.Lock();
}
};
-void OSD::op_rep_modify_commit(MOSDOp *op, eversion_t last_complete)
+void OSD::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete)
{
// send commit.
- dout(10) << "rep_modify_commit on op " << *op << endl;
+ dout(10) << "rep_modify_commit on op " << *op
+ << ", sending commit to osd" << ackerosd
+ << endl;
MOSDOpReply *commit = new MOSDOpReply(op, 0, osdmap->get_epoch(), true);
commit->set_pg_complete_thru(last_complete);
- messenger->send_message(commit, op->get_asker());
+ messenger->send_message(commit, MSG_ADDR_OSD(ackerosd));
delete op;
}
class C_OSD_WriteCommit : public Context {
public:
OSD *osd;
- OSDReplicaOp *repop;
+ pg_t pgid;
+ tid_t rep_tid;
eversion_t pg_last_complete;
- C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op, eversion_t lc) : osd(o), repop(op), pg_last_complete(lc) {}
+ C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {}
void finish(int r) {
- osd->op_modify_commit(repop, pg_last_complete);
+ osd->op_modify_commit(pgid, rep_tid, pg_last_complete);
}
};
object_t oid = op->get_oid();
eversion_t nv = op->get_version();
+ const char *opname = MOSDOp::get_opname(op->get_op());
+ dout(10) << "op_rep_modify " << opname
+ << " " << hex << oid << dec
+ << " v " << nv
+ << " " << op->get_offset() << "~" << op->get_length()
+ << " in " << *pg
+ << endl;
+
+ // we better not be missing this.
+ assert(!pg->missing.is_missing(oid));
+
+ // prepare our transaction
ObjectStore::Transaction t;
+ // update PG log
if (op->get_op() != OSD_OP_WRNOOP) {
- // update PG log
- if (pg->info.last_update < nv)
- prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
- // else, we are playing catch-up, don't update pg metadata! (FIXME?)
+ prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
+ logger->inc("r_wr");
+ logger->inc("r_wrb", op->get_length());
}
+
+ // am i acker?
+ PG::RepOpGather *repop = 0;
+ int ackerosd = pg->acting[0];
+
+ if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
+ ackerosd = pg->get_tail();
- // do op?
- C_OSD_RepModifyCommit *oncommit = 0;
-
- // check current version
- eversion_t myv = 0;
- store->getattr(oid, "version", &myv, sizeof(myv)); // this is a noop if oid dne
- dout(10) << "op_rep_modify existing " << hex << oid << dec << " v " << myv << endl;
-
- // is this an old update? or WRNOOP?
- if (nv <= myv || op->get_op() == OSD_OP_WRNOOP) {
- // we have a newer version. pretend we do a regular commit!
- dout(10) << "op_rep_modify on " << hex << oid << dec
- << " v " << nv << " <= myv | wrnoop, noop"
- << " in " << *pg
- << endl;
- oncommit = new C_OSD_RepModifyCommit(this, op,
- pg->info.last_complete);
+ if (pg->is_tail()) {
+ // i am tail acker.
+ if (pg->repop_gather.count(op->get_rep_tid())) {
+ repop = pg->repop_gather[ op->get_rep_tid() ];
+ } else {
+ repop = new_repop_gather(pg, op);
+ }
+
+ // infer ack from source
+ int fromosd = op->get_source().num();
+ get_repop_gather(repop);
+ {
+ //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice.
+ repop->waitfor_ack.erase(fromosd);
+ }
+ put_repop_gather(pg, repop);
+
+ // prepare dest socket
+ //messenger->prepare_send_message(op->get_client());
+ }
+
+ // chain? forward?
+ if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_tail()) {
+ // chain rep, not at the tail yet.
+ int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
+ issue_repop(pg, op, pg->acting[myrank+1]);
+ }
}
- // missing?
- else if (pg->missing.missing.count(oid)) {
- // old or missing. wait!
- dout(10) << "op_rep_modify on " << hex << oid << dec
- << " v " << nv << " > myv, wait"
- << " in " << *pg
- << endl;
- if (pg->missing.missing[oid] > op->get_version())
- pg->missing.add(oid, op->get_version()); // now we're missing the _newer_ version.
- waitfor_missing_object(op, pg);
- }
- else {
- // we're good.
- dout(10) << "op_rep_modify on " << hex << oid << dec
- << " v " << nv << " (from " << myv << ")"
- << " in " << *pg
- << endl;
- assert(op->get_old_version() == myv);
-
- prepare_op_transaction(t, op, nv, pg);
- oncommit = new C_OSD_RepModifyCommit(this, op,
- pg->info.last_complete);
+ // do op?
+ C_OSD_RepModifyCommit *oncommit = 0;
+ if (repop) {
+ // acker. we'll apply later.
+ } else {
+ // middle|replica.
+ if (op->get_op() != OSD_OP_WRNOOP)
+ prepare_op_transaction(t, op, nv, pg);
- logger->inc("r_wr");
- logger->inc("r_wrb", op->get_length());
+ oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, pg->info.last_complete);
}
- // go
+ // apply log update. and possibly update itself.
unsigned tr = store->apply_transaction(t, oncommit);
if (tr != 0 && // no errors
tr != 2) { // or error on collection_add
cerr << "error applying transaction: r = " << tr << endl;
assert(tr == 0);
}
-
+
// ack?
- if (oncommit) {
- // ack
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
- messenger->send_message(ack, op->get_asker());
- oncommit->ack();
+ if (repop) {
+ // (logical) local ack. this may induce the actual update.
+ get_repop_gather(repop);
+ {
+ assert(repop->waitfor_ack.count(whoami));
+ repop->waitfor_ack.erase(whoami);
+ }
+ put_repop_gather(pg, repop);
+ }
+ else {
+ // send ack to acker?
+ if (g_conf.osd_rep != OSD_REP_CHAIN) {
+ MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
+ messenger->send_message(ack, MSG_ADDR_OSD(ackerosd));
+ }
- pg->last_heartbeat = g_clock.now();
+ // ack myself.
+ assert(oncommit);
+ oncommit->ack();
}
}
void OSD::handle_op(MOSDOp *op)
{
const pg_t pgid = op->get_pg();
- int acting_primary = osdmap->get_pg_acting_primary( pgid );
PG *pg = get_pg(pgid);
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+ // share our map with sender, if they're old
_share_map_incoming(op->get_source(), op->get_map_epoch());
// what kind of op?
- if (!OSD_OP_IS_REP(op->get_op())) {
+ if (!op->get_source().is_osd()) {
// REGULAR OP (non-replication)
// have pg?
waiting_for_pg[pgid].push_back(op);
return;
}
-
- // am i the (same) primary?
- if (acting_primary != whoami ||
- op->get_map_epoch() < pg->info.same_primary_since) {
- dout(7) << "acting primary is osd" << acting_primary
- << " since " << pg->info.same_primary_since
- << ", dropping" << endl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- return;
+
+ bool read = op->get_op() < 10;
+
+ if (read) {
+ // read. am i the (same) acker?
+ if (pg->get_acker() != whoami ||
+ op->get_map_epoch() < pg->info.same_acker_since) {
+ dout(7) << "acting acker is osd" << pg->get_acker()
+ << " since " << pg->info.same_acker_since
+ << ", dropping" << endl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ return;
+ }
+ } else {
+ // write. am i the (same) primary?
+ if (pg->get_primary() != whoami ||
+ op->get_map_epoch() < pg->info.same_primary_since) {
+ dout(7) << "acting primary is osd" << pg->get_primary()
+ << " since " << pg->info.same_primary_since
+ << ", dropping" << endl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ return;
+ }
}
-
+
// must be active.
if (!pg->is_active()) {
// replay?
pg->waiting_for_active.push_back(op);
return;
}
-
+
dout(7) << "handle_op " << op << " in " << *pg << endl;
} else {
- // REPLICATION OP
+ // REPLICATION OP (it's from another OSD)
// have pg?
if (!pg) {
dout(7) << "handle_rep_op " << op
- << " in pgid " << hex << pgid << dec << endl;
- waiting_for_pg[pgid].push_back(op);
+ << " pgid " << hex << pgid << dec << " dne" << endl;
+ delete op;
return;
}
-
- // check osd map. same primary?
+
+ // check osd map: same primary+acker?
if (op->get_map_epoch() != osdmap->get_epoch()) {
- // make sure source is still primary
- const int myrole = pg->get_role(); //osdmap->get_pg_acting_role(op->get_pg(), whoami);
-
- if (acting_primary != MSG_ADDR_NUM(op->get_source()) ||
- myrole <= 0 ||
- op->get_map_epoch() < pg->info.same_primary_since) {
- dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
- << ", primary changed on pg " << hex << op->get_pg() << dec
- << endl;
- MOSDOpReply *fail = new MOSDOpReply(op, -EAGAIN, osdmap->get_epoch(), true); // FIXME error code?
- messenger->send_message(fail, op->get_asker());
+ if (op->get_map_epoch() < pg->info.same_primary_since ||
+ op->get_map_epoch() < pg->info.same_acker_since) {
+ // drop message.
+ delete op;
return;
}
-
- dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
- << ", primary same on pg " << hex << op->get_pg() << dec
+ assert(pg->get_role() >= 0);
+
+ dout(5) << "handle_rep_op map " << op->get_map_epoch() << " != " << osdmap->get_epoch()
+ << ", but primary+acker same in " << *pg
<< endl;
}
}
}
+void OSD::handle_op_reply(MOSDOpReply *op)
+{
+ if (op->get_map_epoch() < boot_epoch) {
+ dout(3) << "replica op reply from before boot" << endl;
+ delete op;
+ return;
+ }
+
+ // must be a rep op.
+ assert(op->get_source().is_osd());
+
+ // make sure we have the pg
+ const pg_t pgid = op->get_pg();
+ PG *pg = get_pg(pgid);
+
+ // require same or newer map
+ if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+
+ // share our map with sender, if they're old
+ _share_map_incoming(op->get_source(), op->get_map_epoch());
+
+ if (!pg) {
+ // hmm.
+ delete op;
+ }
+
+ if (g_conf.osd_maxthreads < 1) {
+ do_op(op, pg); // do it now
+ } else {
+ enqueue_op(pgid, op); // queue for worker threads
+ }
+}
+
/*
* enqueue called with osd_lock held
*/
-void OSD::enqueue_op(pg_t pgid, MOSDOp *op)
+void OSD::enqueue_op(pg_t pgid, Message *op)
{
while (pending_ops > g_conf.osd_max_opq) {
dout(10) << "enqueue_op waiting for pending_ops " << pending_ops << " to drop to " << g_conf.osd_max_opq << endl;
*/
void OSD::dequeue_op(pg_t pgid)
{
- MOSDOp *op;
+ Message *op;
PG *pg;
osd_lock.Lock();
pg = _lock_pg(pgid);
// get pending op
- list<MOSDOp*> &ls = op_queue[pgid];
+ list<Message*> &ls = op_queue[pgid];
assert(!ls.empty());
op = ls.front();
ls.pop_front();
* object lock will be held (if multithreaded)
* osd_lock NOT held.
*/
-void OSD::do_op(MOSDOp *op, PG *pg)
+void OSD::do_op(Message *m, PG *pg)
{
- dout(10) << "do_op " << *op
- //<< " on " << hex << op->get_oid() << dec
- << " in " << *pg << endl;
+ //dout(15) << "do_op " << *op << " in " << *pg << endl;
+
+ if (m->get_type() == MSG_OSD_OP) {
+ MOSDOp *op = (MOSDOp*)m;
- logger->inc("op");
+ logger->inc("op");
- // replication ops?
- if (OSD_OP_IS_REP(op->get_op())) {
- // replication/recovery
switch (op->get_op()) {
- case OSD_OP_REP_PULL:
- op_rep_pull(op, pg);
+
+ // rep stuff
+ case OSD_OP_PULL:
+ op_pull(op, pg);
break;
-
- // replicated ops
- case OSD_OP_REP_WRNOOP:
- case OSD_OP_REP_WRITE:
- case OSD_OP_REP_TRUNCATE:
- case OSD_OP_REP_DELETE:
- case OSD_OP_REP_WRLOCK:
- case OSD_OP_REP_WRUNLOCK:
- case OSD_OP_REP_RDLOCK:
- case OSD_OP_REP_RDUNLOCK:
- case OSD_OP_REP_UPLOCK:
- case OSD_OP_REP_DNLOCK:
- op_rep_modify(op, pg);
+ case OSD_OP_PUSH:
+ op_push(op, pg);
break;
-
- default:
- assert(0);
- }
- } else {
- // regular op
- switch (op->get_op()) {
+
+ // reads
case OSD_OP_READ:
op_read(op, pg);
break;
case OSD_OP_STAT:
op_stat(op, pg);
break;
+
+ // writes
case OSD_OP_WRNOOP:
case OSD_OP_WRITE:
case OSD_OP_ZERO:
case OSD_OP_RDUNLOCK:
case OSD_OP_UPLOCK:
case OSD_OP_DNLOCK:
- op_modify(op, pg);
+ if (op->get_source().is_osd())
+ op_rep_modify(op, pg);
+ else
+ op_modify(op, pg);
break;
+
default:
assert(0);
}
- }
+ }
+ else if (m->get_type() == MSG_OSD_OPREPLY) {
+ // must be replication.
+ MOSDOpReply *r = (MOSDOpReply*)m;
+ tid_t rep_tid = r->get_rep_tid();
+
+ if (pg->repop_gather.count(rep_tid)) {
+ // oh, good.
+ int fromosd = r->get_source().num();
+ repop_ack(pg, pg->repop_gather[rep_tid],
+ r->get_result(), r->get_commit(),
+ fromosd,
+ r->get_pg_complete_thru());
+ delete m;
+ } else {
+ // early ack.
+ pg->waiting_for_repop[rep_tid].push_back(r);
+ }
+ } else
+ assert(0);
}
+
+
void OSD::wait_for_no_ops()
{
if (pending_ops > 0) {
//cout << "getattr returns " << len << " on " << hex << oid << dec << endl;
if (len == sizeof(source) &&
- source != op->get_asker()) {
+ source != op->get_client()) {
//the object is locked for writing by someone else -- add the op to the waiting queue
waiting_for_wr_unlock[oid].push_back(op);
return true;
reply->set_length(0);
}
- dout(12) << " read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+ dout(10) << " read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
logger->inc("rd");
if (got >= 0) logger->inc("rdb", got);
// send it
- messenger->send_message(reply, op->get_asker());
+ messenger->send_message(reply, op->get_client());
delete op;
}
memset(&st, sizeof(st), 0);
int r = store->stat(oid, &st);
- dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+ dout(3) << "op_stat on " << hex << oid << dec
+ << " r = " << r
+ << " size = " << st.st_size
+ << " in " << *pg
+ << endl;
MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap->get_epoch(), true);
reply->set_object_size(st.st_size);
- messenger->send_message(reply, op->get_asker());
+ messenger->send_message(reply, op->get_client());
logger->inc("stat");
-// WRITE OPS
-
-
-
-void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd)
-{
- MOSDOp *op = repop->op;
- object_t oid = op->get_oid();
-
- dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
-
- // forward the write/update/whatever
- __uint64_t tid = ++last_tid;
- MOSDOp *wr = new MOSDOp(tid,
- messenger->get_myaddr(),
- oid,
- pg->get_pgid(),
- osdmap->get_epoch(),
- 100+op->get_op());
- wr->get_data() = op->get_data(); // copy bufferlist
- wr->set_length(op->get_length());
- wr->set_offset(op->get_offset());
- wr->set_version(repop->new_version);
- wr->set_old_version(repop->old_version);
- wr->set_pg_role(1); // replica
- wr->set_pg_trim_to(pg->peers_complete_thru);
- wr->set_orig_asker(op->get_asker());
- wr->set_orig_tid(op->get_tid());
- messenger->send_message(wr, MSG_ADDR_OSD(osd));
-
- repop->osds.insert(osd);
-
- repop->waitfor_ack[tid] = osd;
- repop->waitfor_commit[tid] = osd;
-
- //replica_ops[tid] = repop;
- //replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
- pg->replica_ops[tid] = repop;
- pg->replica_tids_by_osd[osd].insert(tid);
-}
-
+/*********
+ * new repops
+ */
-void OSD::get_repop(OSDReplicaOp *repop)
+void OSD::get_repop_gather(PG::RepOpGather *repop)
{
- repop->lock.Lock();
+ //repop->lock.Lock();
dout(10) << "get_repop " << *repop << endl;
}
-void OSD::put_repop(OSDReplicaOp *repop)
+void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
{
dout(10) << "put_repop " << *repop << endl;
// commit?
if (repop->can_send_commit() &&
repop->op->wants_commit()) {
+ // send commit.
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), true);
dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl;
- messenger->send_message(reply, repop->op->get_asker());
+ messenger->send_message(reply, repop->op->get_client());
repop->sent_commit = true;
}
// ack?
else if (repop->can_send_ack() &&
repop->op->wants_ack()) {
+ // apply
+ dout(10) << "put_repop applying update on " << *repop << endl;
+ ObjectStore::Transaction t;
+ prepare_op_transaction(t, repop->op, repop->new_version, pg);
+ Context *oncommit = new C_OSD_WriteCommit(this, pg->info.pgid, repop->rep_tid, repop->pg_local_last_complete);
+ unsigned r = store->apply_transaction(t, oncommit);
+ if (r)
+ dout(-10) << "put_repop apply transaction return " << r << " on " << *repop << endl;
+
+ // send ack
MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), false);
dout(10) << "put_repop sending ack on " << *repop << " " << reply << endl;
- messenger->send_message(reply, repop->op->get_asker());
+ messenger->send_message(reply, repop->op->get_client());
repop->sent_ack = true;
utime_t now = g_clock.now();
if (repop->can_delete()) {
// adjust peers_complete_thru
if (!repop->pg_complete_thru.empty()) {
- pg_t pgid = repop->op->get_pg();
- osd_lock.Lock();
- PG *pg = get_pg(pgid);
- if (pg) {
- eversion_t min = pg->info.last_complete; // hrm....
- for (unsigned i=1; i<pg->acting.size(); i++) {
- if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want.
- min = repop->pg_complete_thru[i];
- }
-
- if (min > pg->peers_complete_thru) {
- dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
- pg->peers_complete_thru = min;
- }
- //_unlock_pg(pgid);
+ eversion_t min = pg->info.last_complete; // hrm....
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want.
+ min = repop->pg_complete_thru[i];
+ }
+
+ if (min > pg->peers_complete_thru) {
+ dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+ pg->peers_complete_thru = min;
}
- osd_lock.Unlock();
}
dout(10) << "put_repop deleting " << *repop << endl;
- repop->lock.Unlock();
+ //repop->lock.Unlock();
delete repop->op;
delete repop;
} else {
- repop->lock.Unlock();
+ //repop->lock.Unlock();
}
}
-void OSD::op_modify_commit(OSDReplicaOp *repop, eversion_t pg_complete_thru)
+void OSD::issue_repop(PG *pg, MOSDOp *op, int osd)
{
- dout(10) << "op_modify_commit on op " << *repop->op << endl;
- get_repop(repop);
+ object_t oid = op->get_oid();
+
+ dout(7) << " issue_repop rep_tid " << op->get_rep_tid()
+ << " in " << *pg
+ << " o " << hex << oid << dec
+ << " to osd" << osd
+ << endl;
+
+ // forward the write/update/whatever
+ MOSDOp *wr = new MOSDOp(op->get_tid(),
+ op->get_client(),
+ oid,
+ pg->get_pgid(),
+ osdmap->get_epoch(),
+ op->get_op());
+ wr->get_data() = op->get_data(); // _copy_ bufferlist
+ wr->set_length(op->get_length());
+ wr->set_offset(op->get_offset());
+ wr->set_version(op->get_version());
+
+ wr->set_rep_tid(op->get_rep_tid());
+ wr->set_pg_trim_to(pg->peers_complete_thru);
+
+ messenger->send_message(wr, MSG_ADDR_OSD(osd));
+}
+
+PG::RepOpGather *OSD::new_repop_gather(PG *pg,
+ MOSDOp *op)
+{
+ dout(10) << "new_repop_gather rep_tid " << op->get_rep_tid() << " on " << *op << " in " << *pg << endl;
+
+ PG::RepOpGather *repop = new PG::RepOpGather(op, op->get_rep_tid(),
+ op->get_version(),
+ pg->info.last_complete);
+
+ // osds. commits all come to me.
+ for (unsigned i=0; i<pg->acting.size(); i++) {
+ int osd = pg->acting[i];
+ repop->osds.insert(osd);
+ repop->waitfor_commit.insert(osd);
+ }
+
+ // acks vary:
+ if (g_conf.osd_rep == OSD_REP_CHAIN) {
+ // chain rep.
+ // there's my local ack...
+ repop->osds.insert(whoami);
+ repop->waitfor_ack.insert(whoami);
+ repop->waitfor_commit.insert(whoami);
+
+ // also, the previous guy will ack to me
+ int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
+ if (myrank > 0) {
+ int osd = pg->acting[ myrank-1 ];
+ repop->osds.insert(osd);
+ repop->waitfor_ack.insert(osd);
+ repop->waitfor_commit.insert(osd);
+ }
+ } else {
+ // primary, splay. all osds ack to me.
+ for (unsigned i=0; i<pg->acting.size(); i++) {
+ int osd = pg->acting[i];
+ repop->waitfor_ack.insert(osd);
+ }
+ }
+
+ repop->start = g_clock.now();
+
+ pg->repop_gather[ repop->rep_tid ] = repop;
+
+ // anyone waiting? (acks that got here before the op did)
+ if (pg->waiting_for_repop.count(repop->rep_tid)) {
+ take_waiters(pg->waiting_for_repop[repop->rep_tid]);
+ pg->waiting_for_repop.erase(repop->rep_tid);
+ }
+
+ return repop;
+}
+
+
+void OSD::repop_ack(PG *pg, PG::RepOpGather *repop,
+ int result, bool commit,
+ int fromosd, eversion_t pg_complete_thru)
+{
+ MOSDOp *op = repop->op;
+
+ dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
+ << " result " << result << " commit " << commit << " from osd" << fromosd
+ << " in " << *pg
+ << endl;
+
+ get_repop_gather(repop);
{
- assert(repop->waitfor_commit.count(0));
- repop->waitfor_commit.erase(0);
- repop->pg_complete_thru[whoami] = pg_complete_thru;
+ if (commit) {
+ // commit
+ assert(repop->waitfor_commit.count(fromosd));
+ repop->waitfor_commit.erase(fromosd);
+ repop->waitfor_ack.erase(fromosd);
+
+ repop->pg_complete_thru[fromosd] = pg_complete_thru;
+
+ if (repop->waitfor_commit.empty()) {
+ pg->repop_gather.erase(repop->rep_tid);
+ }
+ } else {
+ // ack
+ repop->waitfor_ack.erase(fromosd);
+ }
}
- put_repop(repop);
+ put_repop_gather(pg, repop);
}
+
+
+
+
+/** op_modify_commit
+ * transaction commit on the acker.
+ */
+void OSD::op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru)
+{
+ PG *pg = lock_pg(pgid);
+ if (pg) {
+ if (pg->repop_gather.count(rep_tid)) {
+ PG::RepOpGather *repop = pg->repop_gather[rep_tid];
+
+ dout(10) << "op_modify_commit " << *repop->op << endl;
+ get_repop_gather(repop);
+ {
+ assert(repop->waitfor_commit.count(whoami));
+ repop->waitfor_commit.erase(whoami);
+ repop->pg_complete_thru[whoami] = pg_complete_thru;
+ }
+ put_repop_gather(pg, repop);
+ } else {
+ dout(10) << "op_modify_commit pg " << hex << pgid << dec << " rep_tid " << rep_tid << " dne" << endl;
+ }
+
+ unlock_pg(pgid);
+ } else {
+ dout(10) << "op_modify_commit pg " << hex << pgid << dec << " dne" << endl;
+ }
+}
+
+
/** op_modify
* process client modify op
* NOTE: called from opqueue.
// missing?
if (waitfor_missing_object(op, pg)) return;
+ // are any peers missing this?
+ for (unsigned i=1; i<pg->acting.size(); i++) {
+ int peer = pg->acting[i];
+ if (pg->peer_missing.count(i) &&
+ pg->peer_missing[peer].is_missing(oid)) {
+ // push it before this update. FIXME, this is probably extra much work (eg if we're about to overwrite)
+ pg->peer_missing[peer].got(oid);
+ push(pg, oid, peer);
+ }
+ }
+
// dup op?
- reqid_t reqid(op->get_asker(), op->get_tid());
+ reqid_t reqid(op->get_client(), op->get_tid());
if (pg->log.logged_req(reqid)) {
dout(-3) << "op_modify " << opname << " dup op " << reqid
<< ", doing WRNOOP" << endl;
return; // op will be handled later, after the object unlocks
- // old version
- eversion_t ov = 0; // 0 == dne (yet)
- store->getattr(oid, "version", &ov, sizeof(ov));
-
- // new version
- eversion_t nv;
- if (op->get_op() == OSD_OP_WRNOOP)
- nv = ov;
- else {
- nv = pg->info.last_update;
+ // assign version
+ eversion_t nv = pg->log.top;
+ if (op->get_op() != OSD_OP_WRNOOP) {
nv.epoch = osdmap->get_epoch();
nv.version++;
assert(nv > pg->info.last_update);
- assert(nv > ov);
+ assert(nv > pg->log.top);
if (op->get_version().version) {
// replay
if (nv.version < op->get_version().version)
nv.version = op->get_version().version;
}
-
- // set version in op, for benefit of client and our eventual reply
- op->set_version(nv);
}
+
+ // set version in op, for benefit of client and our eventual reply
+ op->set_version(nv);
dout(10) << "op_modify " << opname
<< " " << hex << oid << dec
<< " v " << nv
- << " ov " << ov
- << " off " << op->get_offset() << " len " << op->get_length()
+ << " " << op->get_offset() << "~" << op->get_length()
<< endl;
-
+
// share latest osd map?
osd_lock.Lock();
{
_share_map_outgoing( MSG_ADDR_OSD(i) );
}
osd_lock.Unlock();
-
+
// issue replica writes
- OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
- repop->start = g_clock.now();
- repop->waitfor_ack[0] = whoami; // will need local ack, commit
- repop->waitfor_commit[0] = whoami;
-
- repop->lock.Lock();
- {
- for (unsigned i=1; i<pg->acting.size(); i++)
- issue_replica_op(pg, repop, pg->acting[i]);
+ PG::RepOpGather *repop = 0;
+ bool alone = (pg->acting.size() == 1);
+ tid_t rep_tid = ++last_tid;
+ op->set_rep_tid(rep_tid);
+
+ if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
+ // chain rep. send to #2 only.
+ issue_repop(pg, op, pg->acting[1]);
+ }
+ else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
+ // splay rep. send to rest.
+ for (unsigned i=1; i<pg->acting.size(); ++i)
+ //for (unsigned i=pg->acting.size()-1; i>=1; --i)
+ issue_repop(pg, op, pg->acting[i]);
+ } else {
+ // primary rep, or alone.
+ repop = new_repop_gather(pg, op);
+
+ // send to rest.
+ if (!alone)
+ for (unsigned i=1; i<pg->acting.size(); i++)
+ issue_repop(pg, op, pg->acting[i]);
}
- repop->lock.Unlock();
- // prepare
- ObjectStore::Transaction t;
- if (op->get_op() != OSD_OP_WRNOOP) {
+ if (repop) {
+ // we are acker.
+ if (op->get_op() != OSD_OP_WRNOOP) {
+ // log now
+ ObjectStore::Transaction t;
+ prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
+ store->apply_transaction(t);
+
+ // update later.
+ }
+
+ // (logical) local ack.
+ // (if alone, this will apply the update.)
+ get_repop_gather(repop);
+ {
+ assert(repop->waitfor_ack.count(whoami));
+ repop->waitfor_ack.erase(whoami);
+ }
+ put_repop_gather(pg, repop);
+
+ } else {
+ // chain or splay. apply.
+ ObjectStore::Transaction t;
prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
prepare_op_transaction(t, op, nv, pg);
+
+ C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_tail(),
+ pg->info.last_complete);
+ unsigned r = store->apply_transaction(t, oncommit);
+ if (r != 0 && // no errors
+ r != 2) { // or error on collection_add
+ cerr << "error applying transaction: r = " << r << endl;
+ assert(r == 0);
+ }
+
+ oncommit->ack();
}
-
- // apply
- Context *oncommit = new C_OSD_WriteCommit(this, repop, pg->info.last_complete);
- unsigned r = store->apply_transaction(t, oncommit);
- if (r != 0 && // no errors
- r != 2) { // or error on collection_add
- cerr << "error applying transaction: r = " << r << endl;
- assert(r == 0);
- }
-
- // pre-ack
- //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
- //messenger->send_message(reply, op->get_asker());
-
- // local ack
- get_repop(repop);
- {
- assert(repop->waitfor_ack.count(0));
- repop->waitfor_ack.erase(0);
- }
- put_repop(repop);
if (op->get_op() == OSD_OP_WRITE) {
logger->inc("c_wr");
eversion_t trim_to)
{
const object_t oid = op->get_oid();
- const pg_t pgid = op->get_pg();
int opcode = PG::Log::Entry::UPDATE;
- if (op->get_op() == OSD_OP_DELETE ||
- op->get_op() == OSD_OP_REP_DELETE) opcode = PG::Log::Entry::DELETE;
+ if (op->get_op() == OSD_OP_DELETE) opcode = PG::Log::Entry::DELETE;
PG::Log::Entry logentry(opcode, op->get_oid(), version,
- op->get_orig_asker(), op->get_orig_tid());
+ op->get_client(), op->get_tid());
dout(10) << "prepare_log_transaction " << op->get_op()
<< (logentry.is_delete() ? " - ":" + ")
<< " v " << version
<< " in " << *pg << endl;
- // raise last_complete?
- if (pg->info.last_complete == pg->log.top)
- pg->info.last_complete = version;
-
- // update pg log
+ // append to log
assert(version > pg->log.top);
- assert(pg->info.last_update == pg->log.top);
pg->log.add(logentry);
assert(pg->log.top == version);
- pg->info.last_update = version;
- // write to pg log
+ // write to pg log on disk
pg->append_log(t, logentry, trim_to);
-
- // write pg info
- t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
}
<< " v " << version
<< " in " << *pg << endl;
- // the op
+ // raise last_complete?
+ if (pg->info.last_complete == pg->info.last_update)
+ pg->info.last_complete = version;
+
+ // raise last_update.
+ assert(version > pg->info.last_update);
+ pg->info.last_update = version;
+
+ // write pg info
+ t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+
+
+ // apply the op
switch (op->get_op()) {
case OSD_OP_WRLOCK:
- case OSD_OP_REP_WRLOCK:
{ // lock object
//r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
- t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t));
+ t.setattr(oid, "wrlock", &op->get_client(), sizeof(msg_addr_t));
}
break;
case OSD_OP_WRUNLOCK:
- case OSD_OP_REP_WRUNLOCK:
{ // unlock objects
//r = store->rmattr(oid, "wrlock", oncommit);
t.rmattr(oid, "wrlock");
break;
case OSD_OP_WRITE:
- case OSD_OP_REP_WRITE:
{ // write
assert(op->get_data().length() == op->get_length());
bufferlist bl;
break;
case OSD_OP_TRUNCATE:
- case OSD_OP_REP_TRUNCATE:
{ // truncate
//r = store->truncate(oid, op->get_offset());
t.truncate(oid, op->get_length() );
break;
case OSD_OP_DELETE:
- case OSD_OP_REP_DELETE:
{ // delete
//r = store->remove(oid);
t.remove(oid);
}
// object collection, version
- if (op->get_op() == OSD_OP_DELETE ||
- op->get_op() == OSD_OP_REP_DELETE) {
+ if (op->get_op() == OSD_OP_DELETE) {
// remove object from c
t.collection_remove(pgid, oid);
} else {