osd->complete_notify((void *)notif, obc);
}
-int ReplicatedPG::prepare_call(MOSDOp *osd_op, ceph_osd_op& op,
- string& cname, string& mname,
- bufferlist::iterator& bp,
- ClassHandler::ClassMethod **pmethod)
-{
- ClassHandler::ClassData *cls;
- int result = osd->class_handler->open_class(cname, &cls);
- assert(result == 0);
-
- bufferlist outdata;
- ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
- if (!method) {
- dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
- return -EINVAL;
- }
- *pmethod = method;
- return 0;
-}
-
// ========================================================================
// low level osd ops
{
int result = 0;
SnapSetContext *ssc = ctx->obc->ssc;
- object_info_t& oi = ctx->obs->oi;
+ ObjectState& obs = ctx->new_obs;
+ object_info_t& oi = obs.oi;
const sobject_t& soid = oi.soid;
dout(10) << "do_osd_op " << osd_op << dendl;
- // modify?
- int flags;
- bool is_modify;
- string cname, mname;
bufferlist::iterator bp = osd_op.data.begin();
+
+ // user-visible modifcation?
switch (op.op) {
- case CEPH_OSD_OP_CALL:
- bp.copy(op.cls.class_len, cname);
- bp.copy(op.cls.method_len, mname);
- {
- ClassHandler::ClassData *cls;
- int r = osd->class_handler->open_class(cname, &cls);
- assert(r == 0);
- flags = cls->get_method_flags(mname.c_str());
- }
- is_modify = flags & CLS_METHOD_WR;
- dout(10) << " class " << cname << "." << mname << " flags " << flags << " is_modify " << is_modify << dendl;
+ // non user-visible modifications
+ case CEPH_OSD_OP_WATCH:
break;
-
default:
- is_modify = (op.op & CEPH_OSD_OP_MODE_WR);
- break;
+ if (op.op & CEPH_OSD_OP_MODE_WR)
+ ctx->user_modify = true;
}
- ctx->reply_version = oi.user_version;
- // make writeable (i.e., clone if necessary)
- if (is_modify) {
- if (!ctx->snapc.is_valid())
- return -EINVAL;
- make_writeable(ctx);
-
- if (op.op != CEPH_OSD_OP_WATCH) {
- /* update the user_version for any modify ops, except for the watch op */
- oi.user_version = ctx->at_version;
- ctx->reply_version = oi.user_version;
- }
- }
-
- dout(0) << "oi.user_version=" << oi.user_version << " is_modify=" << is_modify << dendl;
-
// munge ZERO -> TRUNCATE? (don't munge to DELETE or we risk hosing attributes)
if (op.op == CEPH_OSD_OP_ZERO &&
- ctx->obs->exists &&
+ obs.exists &&
op.extent.offset + op.extent.length >= oi.size) {
dout(10) << " munging ZERO " << op.extent.offset << "~" << op.extent.length
<< " -> TRUNCATE " << op.extent.offset << " (old size is " << oi.size << ")" << dendl;
case CEPH_OSD_OP_CALL:
{
+ string cname, mname;
+ bp.copy(op.cls.class_len, cname);
+ bp.copy(op.cls.method_len, mname);
+
bufferlist indata;
bp.copy(op.cls.indata_len, indata);
- ClassHandler::ClassMethod *method;
- result = prepare_call((MOSDOp *)ctx->op, op, cname, mname, bp, &method);
- if (result == -EAGAIN)
- return result;
-
- if (!result) {
- bufferlist outdata;
+ ClassHandler::ClassData *cls;
+ int result = osd->class_handler->open_class(cname, &cls);
+ assert(result == 0);
- dout(10) << "call method " << cname << "." << mname << dendl;
- result = method->exec((cls_method_context_t)&ctx, indata, outdata);
- dout(10) << "method called response length=" << outdata.length() << dendl;
- op.extent.length = outdata.length();
- odata.claim_append(outdata);
+ ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
+ if (!method) {
+ dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
+ result = -EINVAL;
+ break;
}
+
+ int flags = method->get_flags();
+ if (flags & CLS_METHOD_WR)
+ ctx->user_modify = true;
+
+ bufferlist outdata;
+ dout(10) << "call method " << cname << "." << mname << dendl;
+ result = method->exec((cls_method_context_t)&ctx, indata, outdata);
+ dout(10) << "method called response length=" << outdata.length() << dendl;
+ op.extent.length = outdata.length();
+ odata.claim_append(outdata);
}
break;
case CEPH_OSD_OP_STAT:
{
- if (ctx->obs->exists) {
+ if (obs.exists) {
::encode(oi.size, odata);
::encode(oi.mtime, odata);
dout(10) << "stat oi has " << oi.size << " " << oi.mtime << dendl;
{ // write full object
bufferlist nbl;
bp.copy(op.extent.length, nbl);
- if (ctx->obs->exists)
+ if (obs.exists)
t.truncate(coll, soid, 0);
t.write(coll, soid, op.extent.offset, op.extent.length, nbl);
if (ssc->snapset.clones.size()) {
case CEPH_OSD_OP_ZERO:
{ // zero
assert(op.extent.length);
- if (ctx->obs->exists) {
+ if (obs.exists) {
t.zero(coll, soid, op.extent.offset, op.extent.length);
if (ssc->snapset.clones.size()) {
snapid_t newest = *ssc->snapset.clones.rbegin();
case CEPH_OSD_OP_CREATE:
{ // zero
int flags = le32_to_cpu(op.flags);
- if (ctx->obs->exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
+ if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
result = -EEXIST; /* this is an exclusive create */
else {
t.touch(coll, soid);
case CEPH_OSD_OP_TRUNCATE:
{ // truncate
- if (!ctx->obs->exists) {
+ if (!obs.exists) {
dout(10) << " object dne, truncate is a no-op" << dendl;
break;
}
case CEPH_OSD_OP_SETXATTR:
{
- if (!ctx->obs->exists)
+ if (!obs.exists)
t.touch(coll, soid);
string aname;
bp.copy(op.xattr.name_len, aname);
string name = "_" + aname;
bufferlist bl;
bp.copy(op.xattr.value_len, bl);
- if (!ctx->obs->exists) // create object if it doesn't yet exist.
+ if (!obs.exists) // create object if it doesn't yet exist.
t.touch(coll, soid);
t.setattr(coll, soid, name, bl);
ssc->snapset.head_exists = true;
result = -EOPNOTSUPP;
}
- if ((is_modify) &&
- !ctx->obs->exists && ssc->snapset.head_exists) {
+ if (!obs.exists && ssc->snapset.head_exists) {
dout(20) << " num_objects " << info.stats.num_objects << " -> " << (info.stats.num_objects+1) << dendl;
info.stats.num_objects++;
- ctx->obs->exists = true;
+ obs.exists = true;
}
if (result)
inline void ReplicatedPG::_delete_head(OpContext *ctx)
{
SnapSetContext *ssc = ctx->obc->ssc;
- object_info_t& oi = ctx->obs->oi;
+ ObjectState& obs = ctx->new_obs;
+ object_info_t& oi = obs.oi;
const sobject_t& soid = oi.soid;
ObjectStore::Transaction& t = ctx->op_t;
- if (ctx->obs->exists)
+ if (obs.exists)
t.remove(coll, soid);
if (ssc->snapset.clones.size()) {
snapid_t newest = *ssc->snapset.clones.rbegin();
ssc->snapset.clone_overlap.erase(newest); // ok, redundant.
ssc->snapset.clone_overlap[newest];
}
- if (ctx->obs->exists) {
+ if (obs.exists) {
info.stats.num_objects--;
info.stats.num_bytes -= oi.size;
info.stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10);
oi.size = 0;
ssc->snapset.head_exists = false;
- ctx->obs->exists = false;
+ obs.exists = false;
}
info.stats.num_wr++;
}
void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
{
SnapSetContext *ssc = ctx->obc->ssc;
- object_info_t& oi = ctx->obs->oi;
+ ObjectState& obs = ctx->new_obs;
+ object_info_t& oi = obs.oi;
const sobject_t& soid = oi.soid;
ObjectStore::Transaction& t = ctx->op_t;
snapid_t snapid = (uint64_t)op.snap.snapid;
}
} else { //we got our context, let's use it to do the rollback!
sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid;
- if (ctx->clone_obc && *ctx->clone_obc->obs.oi.snaps.rbegin() <= snapid) {
- //just cloned the rollback target, we don't need to do anything!
+ if (rollback_to->obs.oi.soid.snap == CEPH_NOSNAP) {
+ // rolling back to the head; we just need to clone it.
+ ctx->modify = true;
} else {
/* 1) Delete current head
* 2) Clone correct snapshot into head
<< " and rolling back to old snap" << dendl;
_delete_head(ctx);
- ctx->obs->exists = true; //we're about to recreate it
+ obs.exists = true; //we're about to recreate it
map<string, bufferptr> attrs;
t.clone(coll,
oi.oloc,
false);
assert(clone_context);
- ctx->obs->oi.size = clone_context->obs.oi.size;
+ obs.oi.size = clone_context->obs.oi.size;
map<snapid_t, interval_set<uint64_t> >::iterator iter =
ssc->snapset.clone_overlap.lower_bound(snapid);
void ReplicatedPG::make_writeable(OpContext *ctx)
{
SnapSetContext *ssc = ctx->obc->ssc;
- object_info_t& oi = ctx->obs->oi;
+ ObjectState& obs = ctx->new_obs;
+ object_info_t& oi = obs.oi;
const sobject_t& soid = oi.soid;
SnapContext& snapc = ctx->snapc;
- ObjectStore::Transaction& t = ctx->op_t;
+ ObjectStore::Transaction t;
// clone?
assert(soid.snap == CEPH_NOSNAP);
info.stats.num_objects++;
info.stats.num_object_clones++;
ssc->snapset.clones.push_back(coid.snap);
- ssc->snapset.clone_size[coid.snap] = ctx->obs->oi.size;
+ ssc->snapset.clone_size[coid.snap] = obs.oi.size;
// clone_overlap should contain an entry for each clone
// (an empty interval_set if there is no overlap)
ssc->snapset.clone_overlap[coid.snap];
- if (ctx->obs->oi.size)
- ssc->snapset.clone_overlap[coid.snap].insert(0, ctx->obs->oi.size);
+ if (obs.oi.size)
+ ssc->snapset.clone_overlap[coid.snap].insert(0, obs.oi.size);
// log clone
dout(10) << " cloning v " << oi.version
ctx->at_version.version++;
}
+ // prepend transaction to op_t
+ t.append(ctx->op_t);
+ t.swap(ctx->op_t);
+
// update snapset with latest snap context
ssc->snapset.seq = snapc.seq;
ssc->snapset.snaps = snapc.snaps;
{
assert(!ctx->ops.empty());
- object_info_t *poi = &ctx->obs->oi;
-
- const sobject_t& soid = poi->soid;
+ ObjectState& obs = ctx->new_obs;
+ const sobject_t& soid = obs.oi.soid;
// we'll need this to log
- eversion_t old_version = poi->version;
+ eversion_t old_version = obs.oi.version;
- bool head_existed = ctx->obs->exists;
+ bool head_existed = obs.exists;
// prepare the actual mutation
int result = do_osd_ops(ctx, ctx->ops, ctx->outdata);
- if (result < 0 || ctx->op_t.empty())
+ if (result < 0 ||
+ (ctx->op_t.empty() && !ctx->modify))
return result; // error, or read op.
+ // there was a modification.
+
+ // valid snap context?
+ if (!ctx->snapc.is_valid())
+ return -EINVAL;
+
+ make_writeable(ctx);
+
+ if (ctx->user_modify) {
+ /* update the user_version for any modify ops, except for the watch op */
+ obs.oi.user_version = ctx->at_version;
+ }
+
+ ctx->reply_version = ctx->new_obs.oi.user_version;
+
ctx->bytes_written = ctx->op_t.get_encoded_bytes();
// finish and log the op.
- poi->version = ctx->at_version;
-
+ obs.oi.version = ctx->at_version;
+
bufferlist bss;
::encode(ctx->obc->ssc->snapset, bss);
- assert(ctx->obs->exists == ctx->obc->ssc->snapset.head_exists);
+ assert(obs.exists == ctx->obc->ssc->snapset.head_exists);
// append to log
int logopcode = Log::Entry::MODIFY;
ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version,
ctx->reqid, ctx->mtime));
- if (ctx->obs->exists) {
- poi->version = ctx->at_version;
- poi->prior_version = old_version;
- poi->last_reqid = ctx->reqid;
+ if (obs.exists) {
+ obs.oi.version = ctx->at_version;
+ obs.oi.prior_version = old_version;
+ obs.oi.last_reqid = ctx->reqid;
if (ctx->mtime != utime_t()) {
- poi->mtime = ctx->mtime;
- dout(10) << " set mtime to " << poi->mtime << dendl;
+ obs.oi.mtime = ctx->mtime;
+ dout(10) << " set mtime to " << obs.oi.mtime << dendl;
} else {
- dout(10) << " mtime unchanged at " << poi->mtime << dendl;
+ dout(10) << " mtime unchanged at " << obs.oi.mtime << dendl;
}
- bufferlist bv(sizeof(*poi));
- ::encode(*poi, bv);
+ bufferlist bv(sizeof(obs.oi));
+ ::encode(obs.oi, bv);
ctx->op_t.setattr(coll, soid, OI_ATTR, bv);
dout(10) << " final snapset " << ctx->obc->ssc->snapset
// if we logically recreated the head, remove old _snapdir object
sobject_t snapoid(soid.oid, CEPH_SNAPDIR);
- ctx->snapset_obc = get_object_context(snapoid, poi->oloc, false);
+ ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, false);
if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
ctx->op_t.remove(coll, snapoid);
dout(10) << " removing old " << snapoid << dendl;
ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, old_version,
osd_reqid_t(), ctx->mtime));
- ctx->snapset_obc = get_object_context(snapoid, poi->oloc, true);
+ ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, true);
ctx->snapset_obc->obs.exists = true;
ctx->snapset_obc->obs.oi.version = ctx->at_version;
ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
ctx->snapset_obc->obs.oi.mtime = ctx->mtime;
assert(ctx->snapset_obc->registered);
- bufferlist bv(sizeof(*poi));
+ bufferlist bv(sizeof(obs.oi));
::encode(ctx->snapset_obc->obs.oi, bv);
ctx->op_t.touch(coll, snapoid);
ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv);
ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss);
}
+ // apply new object state.
+ *ctx->obs = ctx->new_obs;
+
return result;
}