return 0;
}
-void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- sobject_t soid,
- vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
- bool& exists, __u64& size, object_info_t& oi,
- eversion_t at_version, SnapContext& snapc,
- eversion_t trim_to)
+void ReplicatedPG::prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
{
bufferlist log_bl;
- eversion_t log_version = at_version;
- assert(!ops.empty());
+ eversion_t log_version = ctx->at_version;
+ assert(!ctx->ops.empty());
- eversion_t old_version = oi.version;
+ eversion_t old_version = ctx->poi->version;
+
+ sobject_t& soid = ctx->soid;
+ vector<ceph_osd_op>& ops = ctx->ops;
+ object_info_t *poi = ctx->poi;
// apply ops
bool did_snap = false;
- bufferlist::iterator bp = bl.begin();
+ bufferlist::iterator bp = ctx->data.begin();
for (unsigned i=0; i<ops.size(); i++) {
// clone?
if (!did_snap && soid.snap &&
!ceph_osd_op_type_lock(ops[i].op)) { // is a (non-lock) modification
- prepare_clone(t, log_bl, reqid, info.stats, soid, size, oi,
- at_version, snapc);
+ prepare_clone(ctx->t, log_bl, ctx->reqid, info.stats, soid, size, *poi,
+ ctx->at_version, ctx->snapc);
did_snap = true;
}
- prepare_simple_op(t, reqid, info.stats, soid, size, exists, oi,
- ops, i, bp, snapc);
+ prepare_simple_op(ctx->t, ctx->reqid, info.stats, soid, size, exists, *poi,
+ ops, i, bp, ctx->snapc);
}
// finish.
- oi.version = at_version;
+ poi->version = ctx->at_version;
if (exists) {
- oi.version = at_version;
- oi.prior_version = old_version;
- oi.last_reqid = reqid;
- if (mtime != utime_t()) {
- oi.mtime = mtime;
- dout(10) << " set mtime to " << oi.mtime << dendl;
+ poi->version = ctx->at_version;
+ poi->prior_version = old_version;
+ poi->last_reqid = ctx->reqid;
+ if (ctx->mtime != utime_t()) {
+ poi->mtime = ctx->mtime;
+ dout(10) << " set mtime to " << poi->mtime << dendl;
} else {
- dout(10) << " mtime unchanged at " << oi.mtime << dendl;
+ dout(10) << " mtime unchanged at " << poi->mtime << dendl;
}
- bufferlist bv(sizeof(oi));
- ::encode(oi, bv);
- t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
+ bufferlist bv(sizeof(*poi));
+ ::encode(*poi, bv);
+ ctx->t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
}
// append to log
int logopcode = Log::Entry::MODIFY;
if (!exists)
logopcode = Log::Entry::DELETE;
- Log::Entry logentry(logopcode, soid, at_version, old_version, reqid, mtime);
+ Log::Entry logentry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime);
add_log_entry(logentry, log_bl);
// write pg info, log to disk
- write_info(t);
- append_log(t, log_bl, log_version, trim_to);
+ write_info(ctx->t);
+ append_log(ctx->t, log_bl, log_version, trim_to);
}
assert(!repop->applied);
Context *oncommit = new C_OSD_ModifyCommit(this, repop);
- unsigned r = osd->store->apply_transaction(repop->t, oncommit);
+ unsigned r = osd->store->apply_transaction(repop->ctx->t, oncommit);
if (r)
dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl;
// discard my reference to the buffer
- repop->op->get_data().clear();
+ repop->ctx->op->get_data().clear();
repop->applied = true;
put_projected_object(repop->pinfo);
+ repop->pinfo = 0;
update_stats();
// any completion stuff to do here?
- sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
- ceph_osd_op& first = repop->op->ops[0];
+ sobject_t& soid = repop->ctx->soid;
+ ceph_osd_op& first = repop->ctx->ops[0];
switch (first.op) {
#if 0
{
dout(10) << "eval_repop " << *repop << dendl;
+ MOSDOp *op = (MOSDOp *)repop->ctx->op;
+
// disk?
if (repop->can_send_disk()) {
- if (repop->op->wants_ondisk()) {
+ if (op->wants_ondisk()) {
// send commit.
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+ osd->messenger->send_message(reply, op->get_orig_source_inst());
repop->sent_disk = true;
}
}
// nvram?
else if (repop->can_send_nvram()) {
- if (repop->op->wants_onnvram()) {
+ if (op->wants_onnvram()) {
// send commit.
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONNVRAM);
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONNVRAM);
dout(10) << " sending onnvram on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+ osd->messenger->send_message(reply, op->get_orig_source_inst());
repop->sent_nvram = true;
}
}
if (!repop->applied)
apply_repop(repop);
- if (repop->op->wants_ack()) {
+ if (op->wants_ack()) {
// send ack
- MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
- osd->messenger->send_message(reply, repop->op->get_orig_source_inst());
+ osd->messenger->send_message(reply, op->get_orig_source_inst());
repop->sent_ack = true;
}
void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
{
- sobject_t soid(repop->op->get_oid(), CEPH_NOSNAP);
+ sobject_t& soid = repop->ctx->soid;
dout(7) << " issue_repop rep_tid " << repop->rep_tid
<< " o " << soid
<< " to osd" << dest
// forward the write/update/whatever
int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
- MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, soid,
- repop->op->ops, repop->noop, acks_wanted,
+ MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
+ repop->ctx->ops, repop->noop, acks_wanted,
osd->osdmap->get_epoch(),
- repop->rep_tid, repop->at_version);
- wr->mtime = repop->mtime;
+ repop->rep_tid, repop->ctx->at_version);
+ wr->mtime = repop->ctx->mtime;
wr->old_exists = repop->pinfo->exists;
wr->old_size = repop->pinfo->size;
wr->old_version = repop->pinfo->oi.version;
wr->snapset = repop->pinfo->oi.snapset;
- wr->snapc = repop->snapc;
- wr->get_data() = repop->op->get_data(); // _copy_ bufferlist
+ wr->snapc = repop->ctx->snapc;
+ wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist
if (osd->osdmap->get_pg_size(info.pgid) == acting.size())
wr->pg_trim_to = peers_complete_thru;
wr->peer_stat = osd->get_my_stat_for(now, dest);
osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
}
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, bool noop,
- tid_t rep_tid,
- ProjectedObjectInfo *pinfo,
- eversion_t nv,
- SnapContext& snapc)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo,
+ bool noop, tid_t rep_tid)
{
- dout(10) << "new_repop rep_tid " << rep_tid << " on " << *op << dendl;
+ dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
- RepGather *repop = new RepGather(op, noop, rep_tid,
- pinfo,
- nv, info.last_complete,
- snapc);
+ RepGather *repop = new RepGather(ctx, pinfo, noop, rep_tid, info.last_complete);
// initialize gather sets
for (unsigned i=0; i<acting.size(); i++) {
void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
int fromosd, eversion_t pg_complete_thru)
{
- MOSDOp *op = repop->op;
+ MOSDOp *op = (MOSDOp *)repop->ctx->op;
dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
<< " result " << result
void ReplicatedPG::op_modify(MOSDOp *op)
{
int whoami = osd->get_nodeid();
- sobject_t soid(op->get_oid(), CEPH_NOSNAP);
+
+ WriteOpContext *ctx = new WriteOpContext(op, op->ops, op->get_data(),
+ sobject_t(op->get_oid(), CEPH_NOSNAP),
+ op->get_reqid(), op->get_mtime());
+ sobject_t& soid = ctx->soid;
// balance-reads set?
#if 0
// add to wait queue
dout(-10) << "preprocess_op waiting for unbalance-reads on " << soid.oid << dendl;
waiting_for_unbalanced_reads[soid.oid].push_back(op);
+ delete ctx;
return;
}
#endif
// get existing object info
ProjectedObjectInfo *pinfo = get_projected_object(soid);
+ ctx->poi = &pinfo->oi;
// --- locking ---
// wrlock?
- if (!op->ops.empty() && // except noop; we just want to flush
+ if (!ctx->ops.empty() && // except noop; we just want to flush
block_if_wrlocked(op, pinfo->oi)) {
put_projected_object(pinfo);
+ delete ctx;
return; // op will be handled later, after the object unlocks
}
// dup op?
bool noop = false;
const char *opname;
- if (op->ops.empty()) {
+ if (ctx->ops.empty()) {
opname = "no-op";
noop = true;
- } else if (is_dup(op->get_reqid())) {
- dout(3) << "op_modify " << op->ops << " dup op " << op->get_reqid()
+ } else if (is_dup(ctx->reqid)) {
+ dout(3) << "op_modify " << ctx->ops << " dup op " << ctx->reqid
<< ", doing WRNOOP" << dendl;
opname = "no-op";
noop = true;
} else
- opname = ceph_osd_op_name(op->ops[0].op);
+ opname = ceph_osd_op_name(ctx->ops[0].op);
// version
- eversion_t at_version = log.top;
+ ctx->at_version = log.top;
if (!noop) {
- at_version.epoch = osd->osdmap->get_epoch();
- at_version.version++;
- assert(at_version > info.last_update);
- assert(at_version > log.top);
+ ctx->at_version.epoch = osd->osdmap->get_epoch();
+ ctx->at_version.version++;
+ assert(ctx->at_version > info.last_update);
+ assert(ctx->at_version > log.top);
}
// snap
- SnapContext snapc;
- snapc.seq = op->get_snap_seq();
- snapc.snaps = op->get_snaps();
+ ctx->snapc.seq = op->get_snap_seq();
+ ctx->snapc.snaps = op->get_snaps();
// set version in op, for benefit of client and our eventual reply
- op->set_version(at_version);
+ op->set_version(ctx->at_version);
dout(10) << "op_modify " << opname
<< " " << soid
- << " ov " << pinfo->oi.version << " av " << at_version
- << " snapc " << snapc
+ << " ov " << pinfo->oi.version << " av " << ctx->at_version
+ << " snapc " << ctx->snapc
<< " snapset " << pinfo->oi.snapset
<< dendl;
// verify snap ordering
if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
- snapc.seq < pinfo->oi.snapset.seq) {
- dout(10) << " ORDERSNAP flag set and snapc seq " << snapc.seq << " < snapset seq " << pinfo->oi.snapset.seq
+ ctx->snapc.seq < pinfo->oi.snapset.seq) {
+ dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
+ << " < snapset seq " << pinfo->oi.snapset.seq
<< " on " << soid << dendl;
+ put_projected_object(pinfo);
+ delete ctx;
osd->reply_op_error(op, -EOLDSNAPC);
return;
}
}
}
- if (op->get_data().length()) {
+ if (ctx->data.length()) {
osd->logger->inc(l_osd_c_wr);
- osd->logger->inc(l_osd_c_wrb, op->get_data().length());
+ osd->logger->inc(l_osd_c_wrb, ctx->data.length());
}
// note my stats
// issue replica writes
tid_t rep_tid = osd->get_tid();
- RepGather *repop = new_repop(op, noop, rep_tid, pinfo, at_version, snapc);
+ RepGather *repop = new_repop(ctx, pinfo, noop, rep_tid);
for (unsigned i=1; i<acting.size(); i++)
issue_repop(repop, acting[i], now);
- eversion_t old_last_update = at_version;
+ eversion_t old_last_update = ctx->at_version;
// trim log?
eversion_t trim_to = is_clean() ? peers_complete_thru : eversion_t();
// we are acker.
if (!noop) {
// log and update later.
- prepare_transaction(repop->t, op->get_reqid(), soid, op->ops, op->get_data(), repop->mtime,
- pinfo->exists, pinfo->size, pinfo->oi,
- at_version, snapc,
- trim_to);
+ prepare_transaction(ctx, pinfo->exists, pinfo->size, trim_to);
}
// keep peer_info up to date
for (unsigned i=1; i<acting.size(); i++) {
Info &in = peer_info[acting[i]];
- in.last_update = at_version;
+ in.last_update = ctx->at_version;
if (in.last_complete == old_last_update)
- in.last_update = at_version;
+ in.last_update = ctx->at_version;
}
// (logical) local ack.
object_info_t oi(op->poid);
oi.version = op->old_version;
oi.snapset = op->snapset;
- prepare_transaction(t, op->reqid,
- op->poid, op->ops, op->get_data(), op->mtime,
- op->old_exists, op->old_size, oi, op->version,
- op->snapc, op->pg_trim_to);
+
+ WriteOpContext ctx(op, op->ops, op->get_data(), op->poid, op->reqid, op->mtime);
+ ctx.poi = &oi;
+ ctx.at_version = op->version;
+ ctx.snapc = op->snapc;
+
+ bool exists = op->old_exists;
+ __u64 size = op->old_size;
+
+ prepare_transaction(&ctx, exists, size, op->pg_trim_to);
}
C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
repop_map.erase(repop->rep_tid);
if (requeue) {
- dout(10) << " requeuing " << *repop->op << dendl;
- rq.push_back(repop->op);
- repop->op = 0;
+ dout(10) << " requeuing " << *repop->ctx->op << dendl;
+ rq.push_back(repop->ctx->op);
+ repop->ctx->op = 0;
}
repop->put();
};
/*
- * gather state on the primary/head while replicating an osd op.
+ * Capture all state associated with a write operation being processed
+ * on the current OSD.
+ */
+ struct WriteOpContext {
+ Message *op;
+ vector<ceph_osd_op>& ops;
+ bufferlist& data;
+ sobject_t soid;
+ osd_reqid_t reqid;
+ utime_t mtime;
+
+ SnapContext snapc; // writer snap context
+
+ //ProjectedObjectInfo *pinfo; // projected object state
+ object_info_t *poi;
+
+ eversion_t at_version; // pg's current version pointer
+ ObjectStore::Transaction t;
+
+ WriteOpContext(Message *_op, vector<ceph_osd_op>& _ops, bufferlist& _data,
+ sobject_t _soid, osd_reqid_t _reqid, utime_t _mtime) :
+ op(_op), ops(_ops), data(_data), soid(_soid), reqid(_reqid), mtime(_mtime),
+ poi(0) {}
+ };
+
+ /*
+ * State on the PG primary associated with the replicated mutation
*/
class RepGather {
public:
xlist<RepGather*>::item queue_item;
int nref;
- class MOSDOp *op;
+ WriteOpContext *ctx;
+ ProjectedObjectInfo *pinfo;
+
tid_t rep_tid;
- utime_t mtime;
bool noop;
- ObjectStore::Transaction t;
bool applied, aborted;
set<int> waitfor_ack;
utime_t start;
- ProjectedObjectInfo *pinfo;
-
- eversion_t at_version;
- SnapContext snapc;
-
eversion_t pg_local_last_complete;
map<int,eversion_t> pg_complete_thru;
- RepGather(MOSDOp *o, bool noop_, tid_t rt,
- ProjectedObjectInfo *i,
- eversion_t av, eversion_t lc,
- SnapContext& sc) :
+ RepGather(WriteOpContext *c, ProjectedObjectInfo *pi, bool noop_, tid_t rt,
+ eversion_t lc) :
queue_item(this),
- nref(1), op(o), rep_tid(rt),
- mtime(op->get_mtime()), noop(noop_),
+ nref(1),
+ ctx(c), pinfo(pi),
+ rep_tid(rt),
+ noop(noop_),
applied(false), aborted(false),
sent_ack(false), sent_nvram(false), sent_disk(false),
- pinfo(i),
- at_version(av),
- snapc(sc),
- pg_local_last_complete(lc) {
- mtime = op->get_mtime();
- }
+ pg_local_last_complete(lc) { }
bool can_send_ack() {
return
void put() {
assert(nref > 0);
if (--nref == 0) {
- delete op;
+ assert(!pinfo);
+ delete ctx;
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
void apply_repop(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, int dest, utime_t now);
- RepGather *new_repop(MOSDOp *op, bool noop, tid_t rep_tid,
- ProjectedObjectInfo *pinfo,
- eversion_t nv,
- SnapContext& snapc);
+ RepGather *new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo, bool noop, tid_t rep_tid);
void repop_ack(RepGather *repop,
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc);
- void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
- sobject_t poid,
- vector<ceph_osd_op>& ops, bufferlist& bl, utime_t mtime,
- bool& exists, __u64& size, object_info_t& oi,
- eversion_t at_version, SnapContext& snapc,
- eversion_t trim_to);
+ void prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to);
friend class C_OSD_ModifyCommit;
friend class C_OSD_RepModifyCommit;
//<< " wfnvram=" << repop.waitfor_nvram
<< " wfdisk=" << repop.waitfor_disk;
out << " pct=" << repop.pg_complete_thru;
- if (repop.op)
- out << " op=" << *(repop.op);
+ if (repop.ctx->op)
+ out << " op=" << *(repop.ctx->op);
out << ")";
return out;
}