state = STATE_BOOTING;
+ hb_stat_ops = 0;
+ hb_stat_qlen = 0;
pending_ops = 0;
waiting_for_no_ops = false;
dispatch(*it);
}
}
-
- // kick myself w/ a ping .. HACK
- //messenger->send_message(new MPing, MSG_ADDR_OSD(whoami));
}
utime_t since = now;
since.sec_ref() -= g_conf.osd_heartbeat_interval;
- dout(-15) << "heartbeat " << now << endl;
+ // calc my stats
+ float avg_qlen = 0;
+ if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
+
+ dout(5) << "heartbeat " << now
+ << ": ops " << hb_stat_ops
+ << ", avg qlen " << avg_qlen
+ << endl;
+
+ // reset until next time around
+ hb_stat_ops = 0;
+ hb_stat_qlen = 0;
// send pings
set<int> pingset;
i != pingset.end();
i++) {
_share_map_outgoing( MSG_ADDR_OSD(*i), osdmap->get_inst(*i) );
- messenger->send_message(new MOSDPing(osdmap->get_epoch()),
+ messenger->send_message(new MOSDPing(osdmap->get_epoch(), avg_qlen),
MSG_ADDR_OSD(*i), osdmap->get_inst(*i));
}
void OSD::dispatch(Message *m)
{
- // check clock regularly
- //utime_t now = g_clock.now();
- //dout(-20) << now << endl;
-
- /*// -- don't need lock --
- switch (m->get_type()) {
- return;
- }
- */
-
-
// lock!
osd_lock.Lock();
-
// need OSDMap
switch (m->get_type()) {
dout(20) << "osdping from " << m->get_source() << endl;
_share_map_incoming(m->get_source(), m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
+ int from = m->get_source().num();
+ peer_qlen[from] = m->avg_qlen;
+
//if (!m->ack)
//messenger->send_message(new MOSDPing(osdmap->get_epoch(), true),
//m->get_source());
+
+ delete m;
}
eversion_t nv = op->get_version();
const char *opname = MOSDOp::get_opname(op->get_op());
+
+ // check crev
+ objectrev_t crev = 0;
+ store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+
dout(10) << "op_rep_modify " << opname
<< " " << oid
<< " v " << nv
int ackerosd = pg->acting[0];
if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
- ackerosd = pg->get_tail();
+ ackerosd = pg->get_acker();
- if (pg->is_tail()) {
+ if (pg->is_acker()) {
// i am tail acker.
if (pg->repop_gather.count(op->get_rep_tid())) {
repop = pg->repop_gather[ op->get_rep_tid() ];
}
// chain? forward?
- if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_tail()) {
+ if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_acker()) {
// chain rep, not at the tail yet.
int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
- issue_repop(pg, op, pg->acting[myrank+1]);
+ int next = myrank+1;
+ if (next == (int)pg->acting.size())
+ next = 1;
+ issue_repop(pg, op, pg->acting[next]);
}
}
if (repop) {
// acker. we'll apply later.
if (op->get_op() != OSD_OP_WRNOOP) {
- prepare_log_transaction(repop->t, op, nv, pg, op->get_pg_trim_to());
- prepare_op_transaction(repop->t, op, nv, pg);
+ prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to());
+ prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg);
}
} else {
// middle|replica.
if (op->get_op() != OSD_OP_WRNOOP) {
- prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
- prepare_op_transaction(t, op, nv, pg);
+ prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to());
+ prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg);
}
oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, pg->info.last_complete);
logger->set("buf", buffer_total_alloc);
+ // update qlen stats
+ hb_stat_ops++;
+ hb_stat_qlen += pending_ops;
+
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
if (read) {
// read. am i the (same) acker?
- if (pg->get_acker() != whoami ||
+ if (//pg->get_acker() != whoami ||
op->get_map_epoch() < pg->info.history.same_acker_since) {
dout(7) << "acting acker is osd" << pg->get_acker()
<< " since " << pg->info.history.same_acker_since
}
// missing object?
- if (op->get_op() != OSD_OP_PUSH &&
- waitfor_missing_object(op, pg)) return;
+ if (read && op->get_oid().rev > 0) {
+ // versioned read. hrm.
+ // are we missing a revision that we might need?
+ object_t moid = op->get_oid();
+ if (pick_missing_object_rev(moid, pg)) {
+ // is there a local revision we might use instead?
+ object_t loid = op->get_oid();
+ if (store->pick_object_revision_lt(loid) &&
+ moid <= loid) {
+ // we need moid. pull it.
+ dout(10) << "handle_op read on " << op->get_oid()
+ << ", have " << loid
+ << ", but need missing " << moid
+ << ", pulling" << endl;
+ pull(pg, moid);
+ pg->waiting_for_missing_object[moid].push_back(op);
+ return;
+ }
+
+ dout(10) << "handle_op read on " << op->get_oid()
+ << ", have " << loid
+ << ", don't need missing " << moid
+ << endl;
+ }
+ } else {
+ // live revision. easy.
+ if (op->get_op() != OSD_OP_PUSH &&
+ waitfor_missing_object(op, pg)) return;
+ }
dout(7) << "handle_op " << *op << " in " << *pg << endl;
+
+ // balance reads?
+ if (read &&
+ g_conf.osd_balance_reads) {
+ // am i above my average?
+ float my_avg = hb_stat_qlen / hb_stat_ops;
+ if (pending_ops > my_avg) {
+ // is there a peer who is below my average?
+ for (unsigned i=1; i<pg->acting.size(); ++i) {
+ int peer = pg->acting[i];
+ if (peer_qlen.count(peer) &&
+ peer_qlen[peer] < my_avg) {
+ // take the first one
+ dout(-10) << "my qlen " << pending_ops << " > my_avg " << my_avg
+ << ", fwd to peer w/ qlen " << peer_qlen[peer]
+ << " osd" << peer
+ << endl;
+ messenger->send_message(op, MSG_ADDR_OSD(peer));
+ return;
+ }
+ }
+ }
+ }
+
} else {
// REPLICATION OP (it's from another OSD)
// ===============================
// OPS
+/*
+int OSD::list_missing_revs(object_t oid, set<object_t>& revs, PG *pg)
+{
+ int c = 0;
+ oid.rev = 0;
+
+ map<object_t,eversion_t>::iterator p = pg->missing.missing.lower_bound(oid);
+ if (p == pg->missing.missing.end())
+ return 0; // clearly not
+
+ while (p->first.ino == oid.ino &&
+ p->first.bno == oid.bno) {
+ revs.insert(p->first);
+ c++;
+ }
+ return c;
+}*/
+
+bool OSD::pick_missing_object_rev(object_t& oid, PG *pg)
+{
+ map<object_t,eversion_t>::iterator p = pg->missing.missing.upper_bound(oid);
+ if (p == pg->missing.missing.end())
+ return false; // clearly no candidate
+
+ if (p->first.ino == oid.ino && p->first.bno == oid.bno) {
+ oid = p->first; // yes! it's an upper bound revision for me.
+ return true;
+ }
+ return false;
+}
+
+bool OSD::pick_object_rev(object_t& oid)
+{
+ object_t t = oid;
+
+ if (!store->pick_object_revision_lt(t))
+ return false; // we have no revisions of this object!
+
+ objectrev_t crev;
+ int r = store->getattr(t, "crev", &crev, sizeof(crev));
+ assert(r >= 0);
+ if (crev <= oid.rev) {
+ dout(10) << "pick_object_rev choosing " << t << " crev " << crev << " for " << oid << endl;
+ oid = t;
+ return true;
+ }
+
+ return false;
+}
+
bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
{
const object_t oid = op->get_oid();
}
+
+
// READ OPS
/** op_read
*/
void OSD::op_read(MOSDOp *op)//, PG *pg)
{
- const object_t oid = op->get_oid();
+ object_t oid = op->get_oid();
// if the target object is locked for writing by another client, put 'op' to the waiting queue
// for _any_ op type -- eg only the locker can unlock!
//<< " in " << *pg
<< endl;
- // read into a buffer
+ long r = 0;
bufferlist bl;
- long got = store->read(oid,
- op->get_offset(), op->get_length(),
- bl);
+
+ if (oid.rev && !pick_object_rev(oid)) {
+ // we have no revision for this request.
+ r = -EEXIST;
+ } else {
+ // read into a buffer
+ r = store->read(oid,
+ op->get_offset(), op->get_length(),
+ bl);
+ }
+
// set up reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true);
- if (got >= 0) {
+ if (r >= 0) {
reply->set_result(0);
reply->set_data(bl);
- reply->set_length(got);
+ reply->set_length(r);
logger->inc("c_rd");
- logger->inc("c_rdb", got);
+ logger->inc("c_rdb", r);
} else {
- reply->set_result(got); // error
+ reply->set_result(r); // error
reply->set_length(0);
}
- dout(10) << " read got " << got << " / " << op->get_length() << " bytes from obj " << oid << endl;
+ dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << endl;
logger->inc("rd");
- if (got >= 0) logger->inc("rdb", got);
+ if (r >= 0) logger->inc("rdb", r);
// send it
messenger->send_message(reply, op->get_client(), op->get_client_inst());
struct stat st;
memset(&st, sizeof(st), 0);
- int r = store->stat(oid, &st);
+ int r = 0;
+
+ if (oid.rev && !pick_object_rev(oid)) {
+ // we have no revision for this request.
+ r = -EEXIST;
+ } else {
+ r = store->stat(oid, &st);
+ }
dout(3) << "op_stat on " << oid
<< " r = " << r
int peer = pg->acting[i];
if (pg->peer_missing.count(peer) &&
pg->peer_missing[peer].is_missing(oid)) {
- // push it before this update. FIXME, this is probably extra much work (eg if we're about to overwrite)
+ // push it before this update.
+ // FIXME, this is probably extra much work (eg if we're about to overwrite)
pg->peer_missing[peer].got(oid);
push(pg, oid, peer);
}
return; // op will be handled later, after the object unlocks
+ // check crev
+ objectrev_t crev = 0;
+ store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+
// assign version
+ eversion_t clone_version;
eversion_t nv = pg->log.top;
if (op->get_op() != OSD_OP_WRNOOP) {
nv.epoch = osdmap->get_epoch();
assert(nv > pg->info.last_update);
assert(nv > pg->log.top);
+ // will clone?
+ if (crev && op->get_rev() && op->get_rev() > crev) {
+ clone_version = nv;
+ nv.version++;
+ }
+
if (op->get_version().version) {
- // replay
- if (nv.version < op->get_version().version)
+ // replay!
+ if (nv.version < op->get_version().version) {
nv.version = op->get_version().version;
- }
+
+ // clone?
+ if (crev && op->get_rev() && op->get_rev() > crev) {
+ // backstep clone
+ clone_version = nv;
+ clone_version.version--;
+ }
+ }
+ }
}
// set version in op, for benefit of client and our eventual reply
dout(10) << "op_modify " << opname
<< " " << oid
<< " v " << nv
+ << " crev " << crev
+ << " rev " << op->get_rev()
<< " " << op->get_offset() << "~" << op->get_length()
<< endl;
if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
// chain rep. send to #2 only.
- issue_repop(pg, op, pg->acting[1]);
+ int next = pg->acting[1];
+ if (pg->acting.size() > 2)
+ next = pg->acting[2];
+ issue_repop(pg, op, next);
}
else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
// splay rep. send to rest.
// we are acker.
if (op->get_op() != OSD_OP_WRNOOP) {
// log and update later.
- prepare_log_transaction(repop->t, op, nv, pg, pg->peers_complete_thru);
- prepare_op_transaction(repop->t, op, nv, pg);
+ prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru);
+ prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg);
}
// (logical) local ack.
} else {
// chain or splay. apply.
ObjectStore::Transaction t;
- prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
- prepare_op_transaction(t, op, nv, pg);
+ prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru);
+ prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg);
- C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_tail(),
+ C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_acker(),
pg->info.last_complete);
unsigned r = store->apply_transaction(t, oncommit);
if (r != 0 && // no errors
void OSD::prepare_log_transaction(ObjectStore::Transaction& t,
- MOSDOp *op, eversion_t& version, PG *pg,
+ MOSDOp *op, eversion_t& version,
+ objectrev_t crev, objectrev_t rev,
+ PG *pg,
eversion_t trim_to)
{
const object_t oid = op->get_oid();
-
- int opcode = PG::Log::Entry::UPDATE;
+
+ // clone entry?
+ if (crev && rev && rev > crev) {
+ eversion_t cv = version;
+ cv.version--;
+ PG::Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv,
+ op->get_client(), op->get_tid());
+ pg->log.add(cloneentry);
+
+ dout(10) << "prepare_log_transaction " << op->get_op()
+ << " " << cloneentry
+ << " in " << *pg << endl;
+ }
+
+ // actual op
+ int opcode = PG::Log::Entry::MODIFY;
if (op->get_op() == OSD_OP_DELETE) opcode = PG::Log::Entry::DELETE;
PG::Log::Entry logentry(opcode, oid, version,
op->get_client(), op->get_tid());
dout(10) << "prepare_log_transaction " << op->get_op()
<< " " << logentry
- // << (logentry.is_delete() ? " - ":" + ")
- //<< oid
- << " v " << version
<< " in " << *pg << endl;
// append to log
* apply an op to the store wrapped in a transaction.
*/
void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
- MOSDOp *op, eversion_t& version, PG *pg)
+ MOSDOp *op, eversion_t& version,
+ objectrev_t crev, objectrev_t rev,
+ PG *pg)
{
const object_t oid = op->get_oid();
const pg_t pgid = op->get_pg();
+ bool did_clone = false;
+
dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op->get_op() )
<< " " << oid
<< " v " << version
+ << " crev " << crev
+ << " rev " << rev
<< " in " << *pg << endl;
// WRNOOP does nothing.
// write pg info
t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+ // clone?
+ if (crev && rev && rev > crev) {
+ object_t noid = oid;
+ noid.rev = rev;
+ dout(10) << "prepare_op_transaction cloning " << oid << " crev " << crev << " to " << noid << endl;
+ t.clone(oid, noid);
+ did_clone = true;
+ }
+
// apply the op
switch (op->get_op()) {
case OSD_OP_WRLOCK:
// object version
t.setattr(oid, "version", &version, sizeof(version));
+
+ // set object crev
+ if (crev == 0 || // new object
+ did_clone) // we cloned
+ t.setattr(oid, "crev", &rev, sizeof(rev));
}
}