#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGRemove.h"
bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
{
// check osd map: same set, or primary+acker?
-
- if (g_conf.osd_rep == OSD_REP_CHAIN) {
- return e >= info.history.same_since; // whole pg set same
- } else {
- // primary, splay
- return (e >= info.history.same_primary_since &&
- e >= info.history.same_acker_since);
- }
+ return (e >= info.history.same_primary_since &&
+ e >= info.history.same_acker_since);
}
// ====================
}
-void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op)
+void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m)
{
assert(is_missing_object(oid));
<< dendl;
pull(oid);
}
- waiting_for_missing_object[oid].push_back(op);
+ waiting_for_missing_object[oid].push_back(m);
}
-
+// ==========================================================
/** preprocess_op - preprocess an op (before it gets queued).
* fasttrack read
op_read(op);
break;
+ // writes
+ case CEPH_OSD_OP_WRNOOP:
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_ZERO:
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_TRUNCATE:
+ case CEPH_OSD_OP_WRLOCK:
+ case CEPH_OSD_OP_WRUNLOCK:
+ case CEPH_OSD_OP_RDLOCK:
+ case CEPH_OSD_OP_RDUNLOCK:
+ case CEPH_OSD_OP_UPLOCK:
+ case CEPH_OSD_OP_DNLOCK:
+ case CEPH_OSD_OP_BALANCEREADS:
+ case CEPH_OSD_OP_UNBALANCEREADS:
+ op_modify(op);
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
+
+void ReplicatedPG::do_sub_op(MOSDSubOp *op)
+{
+ dout(15) << "do_sub_op " << *op << dendl;
+
+ osd->logger->inc("subop");
+
+ switch (op->get_op()) {
+
// rep stuff
case CEPH_OSD_OP_PULL:
- op_pull(op);
+ sub_op_pull(op);
break;
case CEPH_OSD_OP_PUSH:
- op_push(op);
+ sub_op_push(op);
break;
// writes
case CEPH_OSD_OP_DNLOCK:
case CEPH_OSD_OP_BALANCEREADS:
case CEPH_OSD_OP_UNBALANCEREADS:
- if (op->get_source().is_osd()) {
- op_rep_modify(op);
- } else {
- // go go gadget pg
- op_modify(op);
- }
+ sub_op_modify(op);
break;
default:
assert(0);
}
+
}
-void ReplicatedPG::do_op_reply(MOSDOpReply *r)
+void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
{
if (r->get_op() == CEPH_OSD_OP_PUSH) {
// continue peer recovery
- op_push_reply(r);
+ sub_op_push_reply(r);
} else {
- // must be replication.
- tid_t rep_tid = r->get_rep_tid();
- int fromosd = r->get_source().num();
-
- osd->take_peer_stat(fromosd, r->get_peer_stat());
-
- if (rep_gather.count(rep_tid)) {
- // oh, good.
- repop_ack(rep_gather[rep_tid],
- r->get_result(), r->get_commit(),
- fromosd,
- r->get_pg_complete_thru());
- delete r;
- } else {
- // early ack.
- waiting_for_repop[rep_tid].push_back(r);
- }
+ sub_op_modify_reply(r);
}
}
-
// ========================================================================
// READS
// MODIFY
void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t,
- MOSDOp *op, eversion_t& version,
+ osdreqid_t reqid, pobject_t poid, int op, eversion_t version,
objectrev_t crev, objectrev_t rev,
eversion_t trim_to)
{
- const object_t oid = op->get_oid();
-
// clone entry?
if (crev && rev && rev > crev) {
eversion_t cv = version;
cv.version--;
- Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv, op->get_reqid());
+ Log::Entry cloneentry(PG::Log::Entry::CLONE, poid.oid, cv, reqid);
log.add(cloneentry);
- dout(10) << "prepare_log_transaction " << op->get_op()
+ dout(10) << "prepare_log_transaction " << op
<< " " << cloneentry
<< dendl;
}
// actual op
int opcode = Log::Entry::MODIFY;
- if (op->get_op() == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
- Log::Entry logentry(opcode, oid, version, op->get_reqid());
+ if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
+ Log::Entry logentry(opcode, poid.oid, version, reqid);
- dout(10) << "prepare_log_transaction " << op->get_op()
+ dout(10) << "prepare_log_transaction " << op
<< " " << logentry
<< dendl;
/** prepare_op_transaction
* apply an op to the store wrapped in a transaction.
*/
-void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
- MOSDOp *op, eversion_t& version,
- objectrev_t crev, objectrev_t rev)
+void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, const osdreqid_t& reqid,
+ pg_t pgid, int op, pobject_t poid,
+ off_t offset, off_t length, bufferlist& bl,
+ eversion_t& version, objectrev_t crev, objectrev_t rev)
{
- const object_t oid = op->get_oid();
- const pg_t pgid = op->get_pg();
-
bool did_clone = false;
- dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op->get_op() )
- << " " << oid
+ dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op )
+ << " " << poid
<< " v " << version
<< " crev " << crev
<< " rev " << rev
<< dendl;
// WRNOOP does nothing.
- if (op->get_op() == CEPH_OSD_OP_WRNOOP)
+ if (op == CEPH_OSD_OP_WRNOOP)
return;
// raise last_complete?
// clone?
if (crev && rev && rev > crev) {
- object_t noid = oid;
- noid.rev = rev;
- dout(10) << "prepare_op_transaction cloning " << oid << " crev " << crev << " to " << noid << dendl;
- t.clone(oid, noid);
+ assert(0);
+ pobject_t noid = poid; // FIXME ****
+ noid.oid.rev = rev;
+ dout(10) << "prepare_op_transaction cloning " << poid << " crev " << crev << " to " << noid << dendl;
+ t.clone(poid, noid);
did_clone = true;
}
// apply the op
- switch (op->get_op()) {
+ switch (op) {
// -- locking --
case CEPH_OSD_OP_WRLOCK:
{ // lock object
- t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
+ t.setattr(poid, "wrlock", &reqid.name, sizeof(entity_name_t));
}
break;
case CEPH_OSD_OP_WRUNLOCK:
{ // unlock objects
- t.rmattr(oid, "wrlock");
+ t.rmattr(poid, "wrlock");
}
break;
case CEPH_OSD_OP_MININCLOCK:
{
- uint32_t mininc = op->get_length();
- t.setattr(oid, "mininclock", &mininc, sizeof(mininc));
+ uint32_t mininc = length;
+ t.setattr(poid, "mininclock", &mininc, sizeof(mininc));
}
break;
case CEPH_OSD_OP_BALANCEREADS:
{
bool bal = true;
- t.setattr(oid, "balance-reads", &bal, sizeof(bal));
+ t.setattr(poid, "balance-reads", &bal, sizeof(bal));
}
break;
case CEPH_OSD_OP_UNBALANCEREADS:
{
- t.rmattr(oid, "balance-reads");
+ t.rmattr(poid, "balance-reads");
}
break;
case CEPH_OSD_OP_WRITE:
{ // write
- assert(op->get_data().length() == op->get_length());
- bufferlist bl;
- bl.claim( op->get_data() ); // give buffers to store; we keep *op in memory for a long time!
-
- //if (oid < 100000000000000ULL) // hack hack-- don't write client data
- t.write( oid, op->get_offset(), op->get_length(), bl );
+ assert(bl.length() == length);
+ bufferlist nbl;
+ nbl.claim(bl); // give buffers to store; we keep *op in memory for a long time!
+ t.write(poid, offset, length, nbl);
}
break;
{
// zero, remove, or truncate?
struct stat st;
- int r = osd->store->stat(oid, &st);
+ int r = osd->store->stat(poid, &st);
if (r >= 0) {
- if (op->get_length() == 0 ||
- op->get_offset() + (off_t)op->get_length() >= (off_t)st.st_size) {
- if (op->get_offset())
- t.truncate(oid, op->get_length() + op->get_offset());
- else
- t.remove(oid);
- } else {
- // zero. the dumb way. FIXME.
- bufferptr bp(op->get_length());
- bp.zero();
- bufferlist bl;
- bl.push_back(bp);
- t.write(oid, op->get_offset(), op->get_length(), bl);
- }
+ if (offset == 0 && offset + length >= (off_t)st.st_size)
+ t.remove(poid);
+ else
+ t.zero(poid, offset, length);
} else {
// noop?
- dout(10) << "apply_transaction zero on " << oid << ", but dne? stat returns " << r << dendl;
+ dout(10) << "apply_transaction zero on " << poid << ", but dne? stat returns " << r << dendl;
}
}
break;
case CEPH_OSD_OP_TRUNCATE:
{ // truncate
- t.truncate(oid, op->get_length() );
+ t.truncate(poid, length);
}
break;
case CEPH_OSD_OP_DELETE:
{ // delete
- t.remove(oid);
+ t.remove(poid);
}
break;
}
// object collection, version
- if (op->get_op() == CEPH_OSD_OP_DELETE) {
+ if (op == CEPH_OSD_OP_DELETE) {
// remove object from c
- t.collection_remove(pgid, oid);
+ t.collection_remove(pgid, poid);
} else {
// add object to c
- t.collection_add(pgid, oid);
+ t.collection_add(pgid, poid);
// object version
- t.setattr(oid, "version", &version, sizeof(version));
+ t.setattr(poid, "version", &version, sizeof(version));
// set object crev
if (crev == 0 || // new object
did_clone) // we cloned
- t.setattr(oid, "crev", &rev, sizeof(rev));
+ t.setattr(poid, "crev", &rev, sizeof(rev));
}
}
}
-void ReplicatedPG::issue_repop(MOSDOp *op, int dest, utime_t now)
+void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
{
- object_t oid = op->get_oid();
-
- dout(7) << " issue_repop rep_tid " << op->get_rep_tid()
- << " o " << oid
+ pobject_t poid = repop->op->get_oid();
+ dout(7) << " issue_repop rep_tid " << repop->rep_tid
+ << " o " << poid
<< " to osd" << dest
<< dendl;
// forward the write/update/whatever
- MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid,
- oid,
- ObjectLayout(info.pgid),
- osd->osdmap->get_epoch(),
- op->get_op());
- wr->get_data() = op->get_data(); // _copy_ bufferlist
- wr->set_length(op->get_length());
- wr->set_offset(op->get_offset());
- wr->set_version(op->get_version());
-
- wr->set_rep_tid(op->get_rep_tid());
+ MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid,
+ repop->op->get_op(),
+ repop->op->get_offset(), repop->op->get_length(),
+ osd->osdmap->get_epoch(),
+ repop->rep_tid, repop->new_version);
+ wr->get_data() = repop->op->get_data(); // _copy_ bufferlist
wr->set_pg_trim_to(peers_complete_thru);
-
wr->set_peer_stat(osd->get_my_stat_for(now, dest));
-
osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
}
-ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op)
+ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv)
{
dout(10) << "new_rep_gather rep_tid " << op->get_rep_tid() << " on " << *op << dendl;
- int whoami = osd->get_nodeid();
-
- RepGather *repop = new RepGather(op, op->get_rep_tid(),
- op->get_version(),
- info.last_complete);
+ RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete);
// osds. commits all come to me.
for (unsigned i=0; i<acting.size(); i++) {
repop->waitfor_commit.insert(osd);
}
- // acks vary:
- if (g_conf.osd_rep == OSD_REP_CHAIN) {
- // chain rep.
- // there's my local ack...
- repop->osds.insert(whoami);
- repop->waitfor_ack.insert(whoami);
- repop->waitfor_commit.insert(whoami);
-
- // also, the previous guy will ack to me
- int myrank = osd->osdmap->calc_pg_rank(whoami, acting);
- if (myrank > 0) {
- int osd = acting[ myrank-1 ];
- repop->osds.insert(osd);
- repop->waitfor_ack.insert(osd);
- repop->waitfor_commit.insert(osd);
- }
- } else {
- // primary, splay. all osds ack to me.
- for (unsigned i=0; i<acting.size(); i++) {
- int osd = acting[i];
- repop->waitfor_ack.insert(osd);
- }
+ // primary. all osds ack to me.
+ for (unsigned i=0; i<acting.size(); i++) {
+ int osd = acting[i];
+ repop->waitfor_ack.insert(osd);
}
-
+
repop->start = g_clock.now();
- rep_gather[ repop->rep_tid ] = repop;
+ rep_gather[repop->rep_tid] = repop;
// anyone waiting? (acks that got here before the op did)
if (waiting_for_repop.count(repop->rep_tid)) {
class C_OSD_RepModifyCommit : public Context {
public:
ReplicatedPG *pg;
- MOSDOp *op;
+ MOSDSubOp *op;
int destosd;
eversion_t pg_last_complete;
bool acked;
bool waiting;
- C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) :
+ C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) :
pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
acked(false), waiting(false) {
pg->get(); // we're copying the pointer.
lock.Unlock();
pg->lock();
- pg->op_rep_modify_commit(op, destosd, pg_last_complete);
+ pg->sub_op_modify_commit(op, destosd, pg_last_complete);
pg->put_unlock();
}
void ack() {
utime_t now = g_clock.now();
// issue replica writes
- RepGather *repop = 0;
- bool alone = (acting.size() == 1);
tid_t rep_tid = osd->get_tid();
- op->set_rep_tid(rep_tid);
-
- if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
- // chain rep. send to #2 only.
- int next = acting[1];
- if (acting.size() > 2)
- next = acting[2];
- issue_repop(op, next, now);
- }
- else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
- // splay rep. send to rest.
- for (unsigned i=1; i<acting.size(); ++i)
- //for (unsigned i=acting.size()-1; i>=1; --i)
- issue_repop(op, acting[i], now);
- } else {
- // primary rep, or alone.
- repop = new_rep_gather(op);
+ RepGather *repop = new_rep_gather(op, rep_tid, nv);
+ for (unsigned i=1; i<acting.size(); i++)
+ issue_repop(repop, acting[i], now);
- // send to rest.
- if (!alone)
- for (unsigned i=1; i<acting.size(); i++)
- issue_repop(op, acting[i], now);
+ // we are acker.
+ if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+ // log and update later.
+ pobject_t poid = oid;
+ prepare_log_transaction(repop->t, op->get_reqid(), poid, op->get_op(), nv,
+ crev, op->get_rev(), peers_complete_thru);
+ prepare_op_transaction(repop->t, op->get_reqid(),
+ info.pgid, op->get_op(), poid,
+ op->get_offset(), op->get_length(), op->get_data(),
+ nv, crev, op->get_rev());
}
-
- if (repop) {
- // we are acker.
- if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
- // log and update later.
- prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), peers_complete_thru);
- prepare_op_transaction(repop->t, op, nv, crev, op->get_rev());
- }
-
- // (logical) local ack.
- // (if alone, this will apply the update.)
- get_rep_gather(repop);
- {
- assert(repop->waitfor_ack.count(whoami));
- repop->waitfor_ack.erase(whoami);
- }
- put_rep_gather(repop);
-
- } else {
- // not acker.
- // chain or splay. apply.
- ObjectStore::Transaction t;
- prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
- prepare_op_transaction(t, op, nv, crev, op->get_rev());
-
- C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(),
- info.last_complete);
- unsigned r = osd->store->apply_transaction(t, oncommit);
- if (r != 0 && // no errors
- r != 2) { // or error on collection_add
- derr(0) << "error applying transaction: r = " << r << dendl;
- assert(r == 0);
- }
-
- // lets evict the data from our cache to maintain a total large cache size
- if (g_conf.osd_exclusive_caching)
- osd->store->trim_from_cache(op->get_oid(), op->get_offset(), op->get_length());
-
- oncommit->ack();
+
+ // (logical) local ack.
+ // (if alone, this will apply the update.)
+ get_rep_gather(repop);
+ {
+ assert(repop->waitfor_ack.count(whoami));
+ repop->waitfor_ack.erase(whoami);
}
-
+ put_rep_gather(repop);
}
-// replicated
-
-
-
+// sub op modify
-void ReplicatedPG::op_rep_modify(MOSDOp *op)
+void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
{
- object_t oid = op->get_oid();
+ pobject_t poid = op->get_poid();
eversion_t nv = op->get_version();
const char *opname = MOSDOp::get_opname(op->get_op());
// check crev
objectrev_t crev = 0;
- osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+ osd->store->getattr(poid, "crev", (char*)&crev, sizeof(crev));
- dout(10) << "op_rep_modify " << opname
- << " " << oid
+ dout(10) << "sub_op_modify " << opname
+ << " " << poid
<< " v " << nv
<< " " << op->get_offset() << "~" << op->get_length()
<< dendl;
osd->take_peer_stat(fromosd, op->get_peer_stat());
// we better not be missing this.
- assert(!missing.is_missing(oid));
+ assert(!missing.is_missing(poid.oid));
// prepare our transaction
ObjectStore::Transaction t;
- // am i acker?
- RepGather *repop = 0;
+ // do op
int ackerosd = acting[0];
-
- if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
- ackerosd = get_acker();
-
- if (is_acker()) {
- // i am tail acker.
- if (rep_gather.count(op->get_rep_tid())) {
- repop = rep_gather[ op->get_rep_tid() ];
- } else {
- repop = new_rep_gather(op);
- }
-
- // infer ack from source
- get_rep_gather(repop);
- {
- //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice.
- repop->waitfor_ack.erase(fromosd);
- }
- put_rep_gather(repop);
-
- // prepare dest socket
- //messenger->prepare_send_message(op->get_client());
- }
-
- // chain? forward?
- if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) {
- // chain rep, not at the tail yet.
- int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting);
- int next = myrank+1;
- if (next == (int)acting.size())
- next = 1;
- issue_repop(op, acting[next], g_clock.now());
- }
- }
-
- // do op?
- C_OSD_RepModifyCommit *oncommit = 0;
-
osd->logger->inc("r_wr");
osd->logger->inc("r_wrb", op->get_length());
- if (repop) {
- // acker. we'll apply later.
- if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
- prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), op->get_pg_trim_to());
- prepare_op_transaction(repop->t, op, nv, crev, op->get_rev());
- }
- } else {
- // middle|replica.
- if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
- prepare_log_transaction(t, op, nv, crev, op->get_rev(), op->get_pg_trim_to());
- prepare_op_transaction(t, op, nv, crev, op->get_rev());
- }
-
- oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
-
- // apply log update. and possibly update itself.
- unsigned tr = osd->store->apply_transaction(t, oncommit);
- if (tr != 0 && // no errors
- tr != 2) { // or error on collection_add
- derr(0) << "error applying transaction: r = " << tr << dendl;
- assert(tr == 0);
- }
+ if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+ prepare_log_transaction(t, op->get_reqid(), op->get_poid(), op->get_op(), op->get_version(),
+ crev, 0, op->get_pg_trim_to());
+ prepare_op_transaction(t, op->get_reqid(),
+ info.pgid, op->get_op(), poid,
+ op->get_offset(), op->get_length(), op->get_data(),
+ nv, crev, 0);
}
- // ack?
- if (repop) {
- // (logical) local ack. this may induce the actual update.
- get_rep_gather(repop);
- {
- assert(repop->waitfor_ack.count(osd->get_nodeid()));
- repop->waitfor_ack.erase(osd->get_nodeid());
- }
- put_rep_gather(repop);
- }
- else {
- // send ack to acker?
- if (g_conf.osd_rep != OSD_REP_CHAIN) {
- MOSDOpReply *ack = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), false);
- ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
- osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd));
- }
-
- // ack myself.
- assert(oncommit);
- oncommit->ack();
+ C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
+
+ // apply log update. and possibly update itself.
+ unsigned tr = osd->store->apply_transaction(t, oncommit);
+ if (tr != 0 && // no errors
+ tr != 2) { // or error on collection_add
+ derr(0) << "error applying transaction: r = " << tr << dendl;
+ assert(tr == 0);
}
-
+
+ // send ack to acker
+ MOSDSubOpReply *ack = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false);
+ ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
+ osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd));
+
+ // ack myself.
+ oncommit->ack();
}
-
-void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete)
+void ReplicatedPG::sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete)
{
// send commit.
dout(10) << "rep_modify_commit on op " << *op
<< ", sending commit to osd" << ackerosd
<< dendl;
if (osd->osdmap->is_up(ackerosd)) {
- MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
+ MOSDSubOpReply *commit = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), true);
commit->set_pg_complete_thru(last_complete);
commit->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd));
}
}
+void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
+{
+ // must be replication.
+ tid_t rep_tid = r->get_rep_tid();
+ int fromosd = r->get_source().num();
+
+ osd->take_peer_stat(fromosd, r->get_peer_stat());
+
+ if (rep_gather.count(rep_tid)) {
+ // oh, good.
+ repop_ack(rep_gather[rep_tid],
+ r->get_result(), r->get_commit(),
+ fromosd,
+ r->get_pg_complete_thru());
+ delete r;
+ } else {
+ // early ack.
+ waiting_for_repop[rep_tid].push_back(r);
+ }
+}
+
/** pull - request object from a peer
*/
-void ReplicatedPG::pull(object_t oid)
+void ReplicatedPG::pull(pobject_t poid)
{
- assert(missing.loc.count(oid));
- eversion_t v = missing.missing[oid];
- int fromosd = missing.loc[oid];
+ assert(missing.loc.count(poid.oid));
+ eversion_t v = missing.missing[poid.oid];
+ int fromosd = missing.loc[poid.oid];
- dout(7) << "pull " << oid
+ dout(7) << "pull " << poid
<< " v " << v
<< " from osd" << fromosd
<< dendl;
// send op
+ osdreqid_t rid;
tid_t tid = osd->get_tid();
- MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, tid,
- oid, info.pgid,
- osd->osdmap->get_epoch(),
- CEPH_OSD_OP_PULL);
- op->set_version(v);
- osd->messenger->send_message(op, osd->osdmap->get_inst(fromosd));
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
+ 0, 0,
+ osd->osdmap->get_epoch(), tid, v);
+ osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
// take note
- assert(objects_pulling.count(oid) == 0);
+ assert(objects_pulling.count(poid.oid) == 0);
num_pulling++;
- objects_pulling[oid] = v;
+ objects_pulling[poid.oid] = v;
}
/** push - send object to a peer
*/
-void ReplicatedPG::push(object_t oid, int peer)
+void ReplicatedPG::push(pobject_t poid, int peer)
{
// read data+attrs
bufferlist bl;
map<string,bufferptr> attrset;
ObjectStore::Transaction t;
- t.read(oid, 0, 0, &bl);
- t.getattr(oid, "version", &v, &vlen);
- t.getattrs(oid, attrset);
+ t.read(poid, 0, 0, &bl);
+ t.getattr(poid, "version", &v, &vlen);
+ t.getattrs(poid, attrset);
unsigned tr = osd->store->apply_transaction(t);
assert(tr == 0); // !!!
// ok
- dout(7) << "push " << oid << " v " << v
+ dout(7) << "push " << poid << " v " << v
<< " size " << bl.length()
<< " to osd" << peer
<< dendl;
osd->logger->inc("r_pushb", bl.length());
// send
- MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
- oid, info.pgid, osd->osdmap->get_epoch(),
- CEPH_OSD_OP_PUSH);
- op->set_offset(0);
- op->set_length(bl.length());
- op->set_data(bl); // note: claims bl, set length above here!
- op->set_version(v);
- op->set_attrset(attrset);
-
- osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+ osdreqid_t rid; // useless?
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, bl.length(),
+ osd->osdmap->get_epoch(), osd->get_tid(), v);
+ subop->set_data(bl); // note: claims bl, set length above here!
+ subop->set_attrset(attrset);
+ osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
if (is_primary()) {
- peer_missing[peer].got(oid);
- pushing[oid].insert(peer);
+ peer_missing[peer].got(poid.oid);
+ pushing[poid.oid].insert(peer);
}
}
+void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
+{
+ dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+
+ int peer = reply->get_source().num();
+ pobject_t poid = reply->get_poid();
+
+ if (pushing.count(poid.oid) &&
+ pushing[poid.oid].count(peer)) {
+ pushing[poid.oid].erase(peer);
+
+ if (peer_missing.count(peer) == 0 ||
+ peer_missing[peer].num_missing() == 0)
+ uptodate_set.insert(peer);
+
+ if (pushing[poid.oid].empty()) {
+ dout(10) << "pushed " << poid << " to all replicas" << dendl;
+ do_peer_recovery();
+ } else {
+ dout(10) << "pushed " << poid << ", still waiting for push ack from "
+ << pushing[poid.oid] << dendl;
+ }
+ } else {
+ dout(10) << "huh, i wasn't pushing " << poid << dendl;
+ }
+ delete reply;
+}
/** op_pull
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
-void ReplicatedPG::op_pull(MOSDOp *op)
+void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
{
- const object_t oid = op->get_oid();
+ const pobject_t poid = op->get_poid();
const eversion_t v = op->get_version();
int from = op->get_source().num();
- dout(7) << "op_pull " << oid << " v " << op->get_version()
+ dout(7) << "op_pull " << poid << " v " << op->get_version()
<< " from " << op->get_source()
<< dendl;
// primary
assert(peer_missing.count(from)); // we had better know this, from the peering process.
- if (!peer_missing[from].is_missing(oid)) {
+ if (!peer_missing[from].is_missing(poid.oid)) {
dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << dendl;
delete op;
return;
}
// do we have it yet?
- if (is_missing_object(oid)) {
- wait_for_missing_object(oid, op);
+ if (is_missing_object(poid.oid)) {
+ wait_for_missing_object(poid.oid, op);
return;
}
} else {
// non-primary
- if (missing.is_missing(oid)) {
- dout(7) << "op_pull not primary, and missing " << oid << ", ignoring" << dendl;
+ if (missing.is_missing(poid.oid)) {
+ dout(7) << "op_pull not primary, and missing " << poid << ", ignoring" << dendl;
delete op;
return;
}
}
// push it back!
- push(oid, op->get_source().num());
+ push(poid, op->get_source().num());
}
/** op_push
* NOTE: called from opqueue.
*/
-void ReplicatedPG::op_push(MOSDOp *op)
+void ReplicatedPG::sub_op_push(MOSDSubOp *op)
{
- object_t oid = op->get_oid();
+ pobject_t poid = op->get_poid();
eversion_t v = op->get_version();
- if (!is_missing_object(oid)) {
- dout(7) << "op_push not missing " << oid << dendl;
+ if (!is_missing_object(poid.oid)) {
+ dout(7) << "sub_op_push not missing " << poid << dendl;
return;
}
dout(7) << "op_push "
- << oid
+ << poid
<< " v " << v
<< " size " << op->get_length() << " " << op->get_data().length()
<< dendl;
// write object and add it to the PG
ObjectStore::Transaction t;
- t.remove(oid); // in case old version exists
- t.write(oid, 0, op->get_length(), op->get_data());
- t.setattrs(oid, op->get_attrset());
- t.collection_add(info.pgid, oid);
+ t.remove(poid); // in case old version exists
+ t.write(poid, 0, op->get_length(), op->get_data());
+ t.setattrs(poid, op->get_attrset());
+ t.collection_add(info.pgid, poid);
// close out pull op?
num_pulling--;
- if (objects_pulling.count(oid))
- objects_pulling.erase(oid);
- missing.got(oid, v);
+ if (objects_pulling.count(poid.oid))
+ objects_pulling.erase(poid.oid);
+ missing.got(poid.oid, v);
// raise last_complete?
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
assert(peer_missing.count(peer));
- if (peer_missing[peer].is_missing(oid))
- push(oid, peer); // ok, push it, and they (will) have it now.
+ if (peer_missing[peer].is_missing(poid.oid))
+ push(poid, peer); // ok, push it, and they (will) have it now.
}
}
// kick waiters
- if (waiting_for_missing_object.count(oid)) {
- osd->take_waiters(waiting_for_missing_object[oid]);
- waiting_for_missing_object.erase(oid);
+ if (waiting_for_missing_object.count(poid.oid)) {
+ osd->take_waiters(waiting_for_missing_object[poid.oid]);
+ waiting_for_missing_object.erase(poid.oid);
}
if (is_primary()) {
do_recovery();
} else {
// ack if i'm a replica and being pushed to.
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
+ MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false);
osd->messenger->send_message(reply, op->get_source_inst());
}
-
-
+/*
+ * pg status change notification
+ */
void ReplicatedPG::on_osd_failure(int o)
{
repop_ack(*p, -1, true, o);
}
-
void ReplicatedPG::on_acker_change()
{
dout(10) << "on_acker_change" << dendl;
{
dout(10) << "on_change" << dendl;
- if (g_conf.osd_rep == OSD_REP_PRIMARY ||
- g_conf.osd_rep == OSD_REP_SPLAY) {
- // apply all local repops
- // (pg is inactive; we will repeer)
- for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- p != rep_gather.end();
- p++)
- if (!p->second->applied)
- apply_repop(p->second);
- }
- else if (g_conf.osd_rep == OSD_REP_CHAIN) {
- // apply all local repops
- // (pg is inactive; we will repeer)
- // note: because we hose rep_gather, clients must resubmit ops on ANY pg membership change.
- for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
- p != rep_gather.end();
- p++) {
- if (!p->second->applied)
- apply_repop(p->second);
- delete p->second->op;
- delete p->second;
- }
- rep_gather.clear();
-
- // and discard repop waiters (chain/splay artifact)
- for (hash_map<tid_t, list<Message*> >::iterator p = waiting_for_repop.begin();
- p != waiting_for_repop.end();
- p++)
- for (list<Message*>::iterator pm = p->second.begin();
- pm != p->second.end();
- pm++)
- delete *pm;
- waiting_for_repop.clear();
- }
+ // apply all local repops
+ // (pg is inactive; we will repeer)
+ for (hash_map<tid_t,RepGather*>::iterator p = rep_gather.begin();
+ p != rep_gather.end();
+ p++)
+ if (!p->second->applied)
+ apply_repop(p->second);
}
-
void ReplicatedPG::on_role_change()
{
dout(10) << "on_role_change" << dendl;
-
-
-/** clean_up_local
- * remove any objects that we're storing but shouldn't.
- * as determined by log.
- */
-void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
-{
- dout(10) << "clean_up_local" << dendl;
-
- assert(info.last_update >= log.bottom); // otherwise we need some help!
-
- if (log.backlog) {
-
- // FIXME: sloppy pobject vs object conversions abound! ***
-
- // be thorough.
- list<pobject_t> ls;
- osd->store->collection_list(info.pgid, ls);
- set<object_t> s;
-
- for (list<pobject_t>::iterator i = ls.begin();
- i != ls.end();
- i++)
- s.insert(i->oid);
-
- set<object_t> did;
- for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
- p != log.log.rend();
- p++) {
- if (did.count(p->oid)) continue;
- did.insert(p->oid);
-
- if (p->is_delete()) {
- if (s.count(p->oid)) {
- dout(10) << " deleting " << p->oid
- << " when " << p->version << dendl;
- t.remove(p->oid);
- }
- s.erase(p->oid);
- } else {
- // just leave old objects.. they're missing or whatever
- s.erase(p->oid);
- }
- }
-
- for (set<object_t>::iterator i = s.begin();
- i != s.end();
- i++) {
- dout(10) << " deleting stray " << *i << dendl;
- t.remove(*i);
- }
-
- } else {
- // just scan the log.
- set<object_t> did;
- for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
- p != log.log.rend();
- p++) {
- if (did.count(p->oid)) continue;
- did.insert(p->oid);
-
- if (p->is_delete()) {
- dout(10) << " deleting " << p->oid
- << " when " << p->version << dendl;
- t.remove(p->oid);
- } else {
- // keep old(+missing) objects, just for kicks.
- }
- }
- }
-}
-
-
-
void ReplicatedPG::cancel_recovery()
{
// forget about where missing items are, or anything we're pulling
finish_recovery();
}
-void ReplicatedPG::op_push_reply(MOSDOpReply *reply)
-{
- dout(10) << "op_push_reply from " << reply->get_source() << " " << *reply << dendl;
-
- int peer = reply->get_source().num();
- object_t oid = reply->get_oid();
-
- if (pushing.count(oid) &&
- pushing[oid].count(peer)) {
- pushing[oid].erase(peer);
-
- if (peer_missing.count(peer) == 0 ||
- peer_missing[peer].num_missing() == 0)
- uptodate_set.insert(peer);
-
- if (pushing[oid].empty()) {
- dout(10) << "pushed " << oid << " to all replicas" << dendl;
- do_peer_recovery();
- } else {
- dout(10) << "pushed " << oid << ", still waiting for push ack from "
- << pushing[oid] << dendl;
- }
- } else {
- dout(10) << "huh, i wasn't pushing " << oid << dendl;
- }
- delete reply;
-}
-
void ReplicatedPG::purge_strays()
{
dout(10) << "purge_strays " << stray_set << dendl;
stray_set.clear();
}
+
+
+/** clean_up_local
+ * remove any objects that we're storing but shouldn't.
+ * as determined by log.
+ */
+void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
+{
+ dout(10) << "clean_up_local" << dendl;
+
+ assert(info.last_update >= log.bottom); // otherwise we need some help!
+
+ if (log.backlog) {
+
+ // FIXME: sloppy pobject vs object conversions abound! ***
+
+ // be thorough.
+ list<pobject_t> ls;
+ osd->store->collection_list(info.pgid, ls);
+ set<object_t> s;
+
+ for (list<pobject_t>::iterator i = ls.begin();
+ i != ls.end();
+ i++)
+ s.insert(i->oid);
+
+ set<object_t> did;
+ for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+ p != log.log.rend();
+ p++) {
+ if (did.count(p->oid)) continue;
+ did.insert(p->oid);
+
+ if (p->is_delete()) {
+ if (s.count(p->oid)) {
+ dout(10) << " deleting " << p->oid
+ << " when " << p->version << dendl;
+ t.remove(p->oid);
+ }
+ s.erase(p->oid);
+ } else {
+ // just leave old objects.. they're missing or whatever
+ s.erase(p->oid);
+ }
+ }
+
+ for (set<object_t>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ dout(10) << " deleting stray " << *i << dendl;
+ t.remove(*i);
+ }
+
+ } else {
+ // just scan the log.
+ set<object_t> did;
+ for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+ p != log.log.rend();
+ p++) {
+ if (did.count(p->oid)) continue;
+ did.insert(p->oid);
+
+ if (p->is_delete()) {
+ dout(10) << " deleting " << p->oid
+ << " when " << p->version << dendl;
+ t.remove(p->oid);
+ } else {
+ // keep old(+missing) objects, just for kicks.
+ }
+ }
+ }
+}