return;
}
- if (!op->may_write() && !obc->exists) {
+ if (!op->may_write() && !obc->obs.exists) {
osd->reply_op_error(op, -ENOENT);
put_object_context(obc);
return;
}
- const sobject_t& soid = obc->soid;
+ const sobject_t& soid = obc->obs.oi.soid;
OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(),
- obc->state, &obc->oi);
+ obc->state, &obc->obs);
bool noop = false;
if (op->may_write()) {
op->set_version(ctx->at_version);
dout(10) << "do_op " << soid << " " << ctx->ops
- << " ov " << obc->oi.version << " av " << ctx->at_version
+ << " ov " << obc->obs.oi.version << " av " << ctx->at_version
<< " snapc " << ctx->snapc
- << " snapset " << obc->oi.snapset
+ << " snapset " << obc->obs.oi.snapset
<< dendl;
if (is_dup(ctx->reqid)) {
}
} else {
dout(10) << "do_op " << soid << " " << ctx->ops
- << " ov " << obc->oi.version
+ << " ov " << obc->obs.oi.version
<< dendl;
}
// verify snap ordering
if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
- ctx->snapc.seq < obc->oi.snapset.seq) {
+ ctx->snapc.seq < obc->obs.oi.snapset.seq) {
dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
- << " < snapset seq " << obc->oi.snapset.seq
+ << " < snapset seq " << obc->obs.oi.snapset.seq
<< " on " << soid << dendl;
delete ctx;
put_object_context(obc);
// note some basic context for op replication that prepare_transaction may clobber
eversion_t old_last_update = ctx->at_version;
- bool old_exists = obc->exists;
- __u64 old_size = obc->size;
- eversion_t old_version = obc->oi.version;
+ bool old_exists = obc->obs.exists;
+ __u64 old_size = obc->obs.size;
+ eversion_t old_version = obc->obs.oi.version;
// we are acker.
if (!noop) {
- int result = prepare_transaction(ctx, obc->exists, obc->size);
+ int result = prepare_transaction(ctx);
if (result >= 0)
log_op_stats(soid, ctx);
// low level osd ops
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
- bufferlist::iterator& bp, bufferlist& odata,
- bool& exists, __u64& old_size)
+ bufferlist::iterator& bp, bufferlist& odata)
{
int result = 0;
- object_info_t& oi = *ctx->poi;
+ object_info_t& oi = ctx->obs->oi;
+ bool& exists = ctx->obs->exists;
+ __u64& old_size = ctx->obs->size;
+
const sobject_t& soid = oi.soid;
ObjectStore::Transaction& t = ctx->op_t;
// make writeable (i.e., clone if necessary)
if (op.op & CEPH_OSD_OP_MODE_WR)
- make_writeable(ctx, old_size);
+ make_writeable(ctx);
switch (op.op) {
newop.op = CEPH_OSD_OP_WRITE;
newop.offset = old_size;
newop.length = op.length;
- do_osd_ops(ctx, nops, bp, odata, exists, old_size);
+ do_osd_ops(ctx, nops, bp, odata);
}
break;
newop.offset = op.truncate_size;
dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
<< ", truncating with " << newop << dendl;
- do_osd_ops(ctx, nops, bp, odata, exists, old_size);
+ do_osd_ops(ctx, nops, bp, odata);
} else {
// do smart truncate
interval_set<__u64> tm;
newop.op = CEPH_OSD_OP_ZERO;
newop.offset = p->first;
newop.length = p->second;
- do_osd_ops(ctx, nops, bp, odata, exists, old_size);
+ do_osd_ops(ctx, nops, bp, odata);
}
oi.truncate_info.clear();
t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv);
}
-void ReplicatedPG::make_writeable(OpContext *ctx, __u64 size)
+void ReplicatedPG::make_writeable(OpContext *ctx)
{
- object_info_t& oi = *ctx->poi;
+ object_info_t& oi = ctx->obs->oi;
const sobject_t& soid = oi.soid;
SnapContext& snapc = ctx->snapc;
ObjectStore::Transaction& t = ctx->op_t;
snaps[i] = snapc.snaps[i];
// prepare clone
- ctx->clone_obc = new ObjectContext;
+ ctx->clone_obc = new ObjectContext(coid);
ctx->clone_obc->state = ctx->mode; // take state from head obc's
- ctx->clone_obc->soid = coid;
- ctx->clone_obc->oi.soid = coid;
- ctx->clone_obc->oi.version = ctx->at_version;
- ctx->clone_obc->oi.prior_version = oi.version;
- ctx->clone_obc->oi.last_reqid = oi.last_reqid;
- ctx->clone_obc->oi.mtime = oi.mtime;
- ctx->clone_obc->oi.snaps = snaps;
+ ctx->clone_obc->obs.oi.version = ctx->at_version;
+ ctx->clone_obc->obs.oi.prior_version = oi.version;
+ ctx->clone_obc->obs.oi.last_reqid = oi.last_reqid;
+ ctx->clone_obc->obs.oi.mtime = oi.mtime;
+ ctx->clone_obc->obs.oi.snaps = snaps;
ctx->clone_obc->force_start_write();
if (is_primary())
register_object_context(ctx->clone_obc);
- _make_clone(t, soid, coid, &ctx->clone_obc->oi);
+ _make_clone(t, soid, coid, &ctx->clone_obc->obs.oi);
// add to snap bound collections
coll_t fc = make_snap_collection(t, snaps[0]);
info.stats.num_objects++;
info.stats.num_object_clones++;
oi.snapset.clones.push_back(coid.snap);
- oi.snapset.clone_size[coid.snap] = size;
- oi.snapset.clone_overlap[coid.snap].insert(0, size);
+ oi.snapset.clone_size[coid.snap] = ctx->obs->size;
+ oi.snapset.clone_overlap[coid.snap].insert(0, ctx->obs->size);
// log clone
dout(10) << " cloning v " << oi.version
}
-int ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size)
+int ReplicatedPG::prepare_transaction(OpContext *ctx)
{
assert(!ctx->ops.empty());
- object_info_t *poi = ctx->poi;
+ object_info_t *poi = &ctx->obs->oi;
+ bool& exists = ctx->obs->exists;
+
const sobject_t& soid = poi->soid;
// we'll need this to log
// prepare the actual mutation
bufferlist::iterator bp = ctx->indata.begin();
- int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata, exists, size);
+ int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata);
if (result < 0 || ctx->op_t.empty())
return result; // error, or read op.
update_stats();
// any completion stuff to do here?
- const sobject_t& soid = repop->ctx->poi->soid;
+ const sobject_t& soid = repop->ctx->obs->oi.soid;
ceph_osd_op& first = repop->ctx->ops[0];
switch (first.op) {
void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now,
bool old_exists, __u64 old_size, eversion_t old_version)
{
- const sobject_t& soid = repop->ctx->poi->soid;
+ const sobject_t& soid = repop->ctx->obs->oi.soid;
dout(7) << " issue_repop rep_tid " << repop->rep_tid
<< " o " << soid
<< " to osd" << dest
wr->old_exists = old_exists;
wr->old_size = old_size;
wr->old_version = old_version;
- wr->snapset = repop->obc->oi.snapset;
+ wr->snapset = repop->obc->obs.oi.snapset;
wr->snapc = repop->ctx->snapc;
wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist
if (osd->osdmap->get_pg_size(info.pgid) == acting.size())
if (r < 0 && !can_create)
return 0; // -ENOENT!
- obc = new ObjectContext;
- obc->soid = soid;
+ obc = new ObjectContext(soid);
if (r == 0) {
bufferlist bv;
r = osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
assert(r >= 0);
- obc->oi.decode(bv);
+ obc->obs.oi.decode(bv);
- if (soid.snap == CEPH_NOSNAP && !obc->oi.snapset.head_exists) {
- obc->exists = false;
- obc->size = 0;
+ if (soid.snap == CEPH_NOSNAP && !obc->obs.oi.snapset.head_exists) {
+ obc->obs.exists = false;
+ obc->obs.size = 0;
} else {
- obc->exists = true;
- obc->size = st.st_size;
+ obc->obs.exists = true;
+ obc->obs.size = st.st_size;
}
} else {
- obc->oi.soid = soid;
- obc->exists = false;
- obc->size = 0;
+ obc->obs.exists = false;
+ obc->obs.size = 0;
}
- dout(10) << "get_object_context " << soid << " read " << obc->oi << dendl;
+ dout(10) << "get_object_context " << soid << " read " << obc->obs.oi << dendl;
}
obc->ref++;
return obc;
return -ENOENT;
dout(10) << "find_object_context " << oid << " @" << snapid
- << " snapset " << hobc->oi.snapset << dendl;
+ << " snapset " << hobc->obs.oi.snapset << dendl;
// head?
- if (snapid > hobc->oi.snapset.seq) {
+ if (snapid > hobc->obs.oi.snapset.seq) {
dout(10) << "find_object_context " << head
- << " want " << snapid << " > snapset seq " << hobc->oi.snapset.seq
+ << " want " << snapid << " > snapset seq " << hobc->obs.oi.snapset.seq
<< " -- HIT" << dendl;
*pobc = hobc;
return 0;
// which clone would it be?
unsigned k = 0;
- while (k < hobc->oi.snapset.clones.size() &&
- hobc->oi.snapset.clones[k] < snapid)
+ while (k < hobc->obs.oi.snapset.clones.size() &&
+ hobc->obs.oi.snapset.clones[k] < snapid)
k++;
- if (k == hobc->oi.snapset.clones.size()) {
+ if (k == hobc->obs.oi.snapset.clones.size()) {
dout(10) << "get_object_context no clones with last >= snapid " << snapid << " -- DNE" << dendl;
put_object_context(hobc);
return -ENOENT;
}
- sobject_t soid(oid, hobc->oi.snapset.clones[k]);
+ sobject_t soid(oid, hobc->obs.oi.snapset.clones[k]);
put_object_context(hobc); // we're done with head obc
hobc = 0;
ObjectContext *obc = get_object_context(soid);
// clone
- dout(20) << "get_object_context " << soid << " snaps " << obc->oi.snaps << dendl;
- snapid_t first = obc->oi.snaps[obc->oi.snaps.size()-1];
- snapid_t last = obc->oi.snaps[0];
+ dout(20) << "get_object_context " << soid << " snaps " << obc->obs.oi.snaps << dendl;
+ snapid_t first = obc->obs.oi.snaps[obc->obs.oi.snaps.size()-1];
+ snapid_t last = obc->obs.oi.snaps[0];
if (first <= snapid) {
dout(20) << "get_object_context " << soid << " [" << first << "," << last
<< "] contains " << snapid << " -- HIT" << dendl;
void ReplicatedPG::put_object_context(ObjectContext *obc)
{
- dout(10) << "put_object_context " << obc->soid << " "
+ dout(10) << "put_object_context " << obc->obs.oi.soid << " "
<< obc->ref << " -> " << (obc->ref-1) << dendl;
if (obc->wake) {
assert(obc->waiting.empty());
if (obc->registered)
- object_contexts.erase(obc->soid);
+ object_contexts.erase(obc->obs.oi.soid);
delete obc;
if (object_contexts.empty())
osd->logger->inc(l_osd_r_wr);
osd->logger->inc(l_osd_r_wrb, op->get_data().length());
+ ObjectState obs(op->poid);
+ obs.oi.version = op->old_version;
+ obs.oi.snapset = op->snapset;
+ obs.exists = op->old_exists;
+ obs.size = op->old_size;
- object_info_t oi(op->poid);
- oi.version = op->old_version;
- oi.snapset = op->snapset;
-
- OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &oi);
+ OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &obs);
if (!op->noop) {
ctx.mtime = op->mtime;
ctx.at_version = op->version;
ctx.snapc = op->snapc;
- bool exists = op->old_exists;
- __u64 size = op->old_size;
-
- prepare_transaction(&ctx, exists, size);
+ prepare_transaction(&ctx);
log_op(&ctx);
}
object_info_t oi(soid);
oi.version = latest->version;
oi.prior_version = latest->prior_version;
- oi.last_reqid = headobc->oi.last_reqid;
- oi.mtime = headobc->oi.mtime;
+ oi.last_reqid = headobc->obs.oi.last_reqid;
+ oi.mtime = headobc->obs.oi.mtime;
::decode(oi.snaps, latest->snaps);
_make_clone(t, head, soid, &oi);
class ReplicatedPG : public PG {
public:
+ struct ObjectState {
+ object_info_t oi;
+ bool exists;
+ __u64 size;
+
+ ObjectState(const sobject_t& s) : oi(s), exists(false), size(0) {}
+ };
+
/*
object access states:
}
}
- sobject_t soid;
int ref;
bool registered;
list<Message*> waiting;
bool wake;
- bool exists;
- __u64 size;
-
- object_info_t oi;
-
+ ObjectState obs;
void get() { ++ref; }
}
}
- ObjectContext() : ref(0), registered(true), state(IDLE), num_wr(0), num_rmw(0), wake(false),
- exists(false), size(0), oi(soid) {}
+ ObjectContext(const sobject_t& s) :
+ ref(0), registered(true), state(IDLE), num_wr(0), num_rmw(0), wake(false),
+ obs(s) {}
};
bufferlist outdata;
ObjectContext::state_t mode; // DELAYED or RMW (or _FLUSHING variant?)
- object_info_t *poi;
+
+ ObjectState *obs;
utime_t mtime;
SnapContext snapc; // writer snap context
int data_off; // FIXME: we may want to kill this msgr hint off at some point!
OpContext(Message *_op, osd_reqid_t _reqid, vector<ceph_osd_op>& _ops, bufferlist& _data,
- ObjectContext::state_t _mode, object_info_t *_poi) :
- op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), poi(_poi),
+ ObjectContext::state_t _mode, ObjectState *_obs) :
+ op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), obs(_obs),
clone_obc(0), data_off(0) {}
~OpContext() {
assert(!clone_obc);
void register_object_context(ObjectContext *obc) {
if (!obc->registered) {
obc->registered = true;
- object_contexts[obc->soid] = obc;
+ object_contexts[obc->obs.oi.soid] = obc;
}
}
void put_object_context(ObjectContext *obc);
void _make_clone(ObjectStore::Transaction& t,
sobject_t head, sobject_t coid,
object_info_t *poi);
- void make_writeable(OpContext *ctx, __u64 size);
+ void make_writeable(OpContext *ctx);
int do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
- bufferlist::iterator& bp, bufferlist& odata,
- bool& exists, __u64& size);
+ bufferlist::iterator& bp, bufferlist& odata);
void log_op_stats(const sobject_t &soid, OpContext *ctx);
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);
- int prepare_transaction(OpContext *ctx, bool& exists, __u64& size);
+ int prepare_transaction(OpContext *ctx);
void log_op(OpContext *ctx);
friend class C_OSD_OpCommit;
inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc)
{
- out << "obc(" << obc.soid << " " << obc.get_state_name(obc.state);
+ out << "obc(" << obc.obs.oi.soid << " " << obc.get_state_name(obc.state);
if (!obc.waiting.empty())
out << " WAITING";
out << ")";