// subop metadata
tid_t rep_tid;
eversion_t version;
+ uint32_t inc_lock;
// piggybacked osd/og state
eversion_t pg_trim_to; // primary->replica: trim to here
const eversion_t get_pg_trim_to() { return st.pg_trim_to; }
void set_pg_trim_to(eversion_t v) { st.pg_trim_to = v; }
+ unsigned get_inc_lock() { return st.inc_lock; }
+ void set_inc_lock(unsigned i) { st.inc_lock = i; }
+
map<string,bufferptr>& get_attrset() { return attrset; }
void set_attrset(map<string,bufferptr> &as) { attrset.swap(as); }
const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, int o, off_t of, off_t le,
- epoch_t mape, tid_t rtid, eversion_t v) :
+ epoch_t mape, tid_t rtid, unsigned il, eversion_t v) :
Message(MSG_OSD_SUBOP) {
memset(&st, 0, sizeof(st));
st.reqid = r;
st.length = le;
st.map_epoch = mape;
st.rep_tid = rtid;
+ st.inc_lock = il;
st.version = v;
}
MOSDSubOp() {}
void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, const osd_reqid_t& reqid,
pg_t pgid, int op, pobject_t poid,
off_t offset, off_t length, bufferlist& bl,
- eversion_t& version, objectrev_t crev, objectrev_t rev)
+ eversion_t& version, __u32 inc_lock, objectrev_t crev, objectrev_t rev)
{
bool did_clone = false;
}
break;
- case CEPH_OSD_OP_MININCLOCK:
- {
- uint32_t mininc = length;
- t.setattr(poid, "mininclock", &mininc, sizeof(mininc));
- }
- break;
-
case CEPH_OSD_OP_BALANCEREADS:
{
bool bal = true;
bufferlist nbl;
nbl.claim(bl); // give buffers to store; we keep *op in memory for a long time!
t.write(poid, offset, length, nbl);
+ if (inc_lock) t.setattr(poid, "inc_lock", &inc_lock, sizeof(inc_lock));
}
break;
if (r >= 0) {
if (offset == 0 && offset + length >= (off_t)st.st_size)
t.remove(poid);
- else
+ else {
t.zero(poid, offset, length);
+ if (inc_lock) t.setattr(poid, "inc_lock", &inc_lock, sizeof(inc_lock));
+ }
} else {
// noop?
dout(10) << "apply_transaction zero on " << poid << ", but dne? stat returns " << r << dendl;
case CEPH_OSD_OP_TRUNCATE:
{ // truncate
t.truncate(poid, length);
+ if (inc_lock) t.setattr(poid, "inc_lock", &inc_lock, sizeof(inc_lock));
}
break;
repop->op->get_op(),
repop->op->get_offset(), repop->op->get_length(),
osd->osdmap->get_epoch(),
- repop->rep_tid, repop->new_version);
+ repop->rep_tid, repop->op->get_inc_lock(), repop->new_version);
wr->get_data() = repop->op->get_data(); // _copy_ bufferlist
wr->set_pg_trim_to(peers_complete_thru);
wr->set_peer_stat(osd->get_my_stat_for(now, dest));
prepare_op_transaction(repop->t, op->get_reqid(),
info.pgid, op->get_op(), poid,
op->get_offset(), op->get_length(), op->get_data(),
- nv, crev, poid.oid.rev);
+ nv, op->get_inc_lock(), crev, poid.oid.rev);
}
// (logical) local ack.
prepare_op_transaction(t, op->get_reqid(),
info.pgid, op->get_op(), poid,
op->get_offset(), op->get_length(), op->get_data(),
- nv, crev, 0);
+ nv, op->get_inc_lock(), crev, 0);
}
C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
tid_t tid = osd->get_tid();
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
0, 0,
- osd->osdmap->get_epoch(), tid, v);
+ osd->osdmap->get_epoch(), tid, 0, v);
osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
// take note
// send
osd_reqid_t rid; // useless?
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, bl.length(),
- osd->osdmap->get_epoch(), osd->get_tid(), v);
+ osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
subop->set_data(bl); // note: claims bl, set length above here!
subop->set_attrset(attrset);
osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));