From e643642b85a4050cb1f968188b71e87e088470da Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 26 Apr 2011 10:35:03 -0700 Subject: [PATCH] osd: move ObjectState side effects out of do_osd_ops We want to be able to handle a failure mid-way through an OSDOp transaction and bail out with no side effects. This patch * puts an ObjectState new_obs in the OoContext that modifications go in * only applies if it the transaction is a success * only does make_writeable (at the end!) if the transaction is a success There are still side effects with the watch/notify stuff, though. Signed-off-by: Sage Weil --- src/osd/ClassHandler.h | 5 + src/osd/ReplicatedPG.cc | 214 +++++++++++++++++++--------------------- src/osd/ReplicatedPG.h | 13 ++- 3 files changed, 113 insertions(+), 119 deletions(-) diff --git a/src/osd/ClassHandler.h b/src/osd/ClassHandler.h index bb6ed627379a6..56c6e95f78871 100644 --- a/src/osd/ClassHandler.h +++ b/src/osd/ClassHandler.h @@ -24,6 +24,11 @@ public: int exec(cls_method_context_t ctx, bufferlist& indata, bufferlist& outdata); void unregister(); + int get_flags() { + Mutex::Locker l(cls->handler->mutex); + return flags; + } + ClassMethod() : cls(0), func(0), cxx_func(0) {} }; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b63bf0fa667ae..a589d1afcf968 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -968,25 +968,6 @@ void ReplicatedPG::do_complete_notify(Watch::Notification *notif, ObjectContext osd->complete_notify((void *)notif, obc); } -int ReplicatedPG::prepare_call(MOSDOp *osd_op, ceph_osd_op& op, - string& cname, string& mname, - bufferlist::iterator& bp, - ClassHandler::ClassMethod **pmethod) -{ - ClassHandler::ClassData *cls; - int result = osd->class_handler->open_class(cname, &cls); - assert(result == 0); - - bufferlist outdata; - ClassHandler::ClassMethod *method = cls->get_method(mname.c_str()); - if (!method) { - dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl; - return -EINVAL; - } - *pmethod = method; - return 0; -} - // ======================================================================== // low level osd ops @@ -995,7 +976,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, { int result = 0; SnapSetContext *ssc = ctx->obc->ssc; - object_info_t& oi = ctx->obs->oi; + ObjectState& obs = ctx->new_obs; + object_info_t& oi = obs.oi; const sobject_t& soid = oi.soid; @@ -1009,49 +991,21 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, dout(10) << "do_osd_op " << osd_op << dendl; - // modify? - int flags; - bool is_modify; - string cname, mname; bufferlist::iterator bp = osd_op.data.begin(); + + // user-visible modifcation? switch (op.op) { - case CEPH_OSD_OP_CALL: - bp.copy(op.cls.class_len, cname); - bp.copy(op.cls.method_len, mname); - { - ClassHandler::ClassData *cls; - int r = osd->class_handler->open_class(cname, &cls); - assert(r == 0); - flags = cls->get_method_flags(mname.c_str()); - } - is_modify = flags & CLS_METHOD_WR; - dout(10) << " class " << cname << "." << mname << " flags " << flags << " is_modify " << is_modify << dendl; + // non user-visible modifications + case CEPH_OSD_OP_WATCH: break; - default: - is_modify = (op.op & CEPH_OSD_OP_MODE_WR); - break; + if (op.op & CEPH_OSD_OP_MODE_WR) + ctx->user_modify = true; } - ctx->reply_version = oi.user_version; - // make writeable (i.e., clone if necessary) - if (is_modify) { - if (!ctx->snapc.is_valid()) - return -EINVAL; - make_writeable(ctx); - - if (op.op != CEPH_OSD_OP_WATCH) { - /* update the user_version for any modify ops, except for the watch op */ - oi.user_version = ctx->at_version; - ctx->reply_version = oi.user_version; - } - } - - dout(0) << "oi.user_version=" << oi.user_version << " is_modify=" << is_modify << dendl; - // munge ZERO -> TRUNCATE? (don't munge to DELETE or we risk hosing attributes) if (op.op == CEPH_OSD_OP_ZERO && - ctx->obs->exists && + obs.exists && op.extent.offset + op.extent.length >= oi.size) { dout(10) << " munging ZERO " << op.extent.offset << "~" << op.extent.length << " -> TRUNCATE " << op.extent.offset << " (old size is " << oi.size << ")" << dendl; @@ -1174,29 +1128,40 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_CALL: { + string cname, mname; + bp.copy(op.cls.class_len, cname); + bp.copy(op.cls.method_len, mname); + bufferlist indata; bp.copy(op.cls.indata_len, indata); - ClassHandler::ClassMethod *method; - result = prepare_call((MOSDOp *)ctx->op, op, cname, mname, bp, &method); - if (result == -EAGAIN) - return result; - - if (!result) { - bufferlist outdata; + ClassHandler::ClassData *cls; + int result = osd->class_handler->open_class(cname, &cls); + assert(result == 0); - dout(10) << "call method " << cname << "." << mname << dendl; - result = method->exec((cls_method_context_t)&ctx, indata, outdata); - dout(10) << "method called response length=" << outdata.length() << dendl; - op.extent.length = outdata.length(); - odata.claim_append(outdata); + ClassHandler::ClassMethod *method = cls->get_method(mname.c_str()); + if (!method) { + dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl; + result = -EINVAL; + break; } + + int flags = method->get_flags(); + if (flags & CLS_METHOD_WR) + ctx->user_modify = true; + + bufferlist outdata; + dout(10) << "call method " << cname << "." << mname << dendl; + result = method->exec((cls_method_context_t)&ctx, indata, outdata); + dout(10) << "method called response length=" << outdata.length() << dendl; + op.extent.length = outdata.length(); + odata.claim_append(outdata); } break; case CEPH_OSD_OP_STAT: { - if (ctx->obs->exists) { + if (obs.exists) { ::encode(oi.size, odata); ::encode(oi.mtime, odata); dout(10) << "stat oi has " << oi.size << " " << oi.mtime << dendl; @@ -1467,7 +1432,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, { // write full object bufferlist nbl; bp.copy(op.extent.length, nbl); - if (ctx->obs->exists) + if (obs.exists) t.truncate(coll, soid, 0); t.write(coll, soid, op.extent.offset, op.extent.length, nbl); if (ssc->snapset.clones.size()) { @@ -1499,7 +1464,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_ZERO: { // zero assert(op.extent.length); - if (ctx->obs->exists) { + if (obs.exists) { t.zero(coll, soid, op.extent.offset, op.extent.length); if (ssc->snapset.clones.size()) { snapid_t newest = *ssc->snapset.clones.rbegin(); @@ -1519,7 +1484,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_CREATE: { // zero int flags = le32_to_cpu(op.flags); - if (ctx->obs->exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) + if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) result = -EEXIST; /* this is an exclusive create */ else { t.touch(coll, soid); @@ -1534,7 +1499,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_TRUNCATE: { // truncate - if (!ctx->obs->exists) { + if (!obs.exists) { dout(10) << " object dne, truncate is a no-op" << dendl; break; } @@ -1679,14 +1644,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_SETXATTR: { - if (!ctx->obs->exists) + if (!obs.exists) t.touch(coll, soid); string aname; bp.copy(op.xattr.name_len, aname); string name = "_" + aname; bufferlist bl; bp.copy(op.xattr.value_len, bl); - if (!ctx->obs->exists) // create object if it doesn't yet exist. + if (!obs.exists) // create object if it doesn't yet exist. t.touch(coll, soid); t.setattr(coll, soid, name, bl); ssc->snapset.head_exists = true; @@ -1992,11 +1957,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, result = -EOPNOTSUPP; } - if ((is_modify) && - !ctx->obs->exists && ssc->snapset.head_exists) { + if (!obs.exists && ssc->snapset.head_exists) { dout(20) << " num_objects " << info.stats.num_objects << " -> " << (info.stats.num_objects+1) << dendl; info.stats.num_objects++; - ctx->obs->exists = true; + obs.exists = true; } if (result) @@ -2008,11 +1972,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, inline void ReplicatedPG::_delete_head(OpContext *ctx) { SnapSetContext *ssc = ctx->obc->ssc; - object_info_t& oi = ctx->obs->oi; + ObjectState& obs = ctx->new_obs; + object_info_t& oi = obs.oi; const sobject_t& soid = oi.soid; ObjectStore::Transaction& t = ctx->op_t; - if (ctx->obs->exists) + if (obs.exists) t.remove(coll, soid); if (ssc->snapset.clones.size()) { snapid_t newest = *ssc->snapset.clones.rbegin(); @@ -2023,13 +1988,13 @@ inline void ReplicatedPG::_delete_head(OpContext *ctx) ssc->snapset.clone_overlap.erase(newest); // ok, redundant. ssc->snapset.clone_overlap[newest]; } - if (ctx->obs->exists) { + if (obs.exists) { info.stats.num_objects--; info.stats.num_bytes -= oi.size; info.stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10); oi.size = 0; ssc->snapset.head_exists = false; - ctx->obs->exists = false; + obs.exists = false; } info.stats.num_wr++; } @@ -2037,7 +2002,8 @@ inline void ReplicatedPG::_delete_head(OpContext *ctx) void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) { SnapSetContext *ssc = ctx->obc->ssc; - object_info_t& oi = ctx->obs->oi; + ObjectState& obs = ctx->new_obs; + object_info_t& oi = obs.oi; const sobject_t& soid = oi.soid; ObjectStore::Transaction& t = ctx->op_t; snapid_t snapid = (uint64_t)op.snap.snapid; @@ -2064,8 +2030,9 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) } } else { //we got our context, let's use it to do the rollback! sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid; - if (ctx->clone_obc && *ctx->clone_obc->obs.oi.snaps.rbegin() <= snapid) { - //just cloned the rollback target, we don't need to do anything! + if (rollback_to->obs.oi.soid.snap == CEPH_NOSNAP) { + // rolling back to the head; we just need to clone it. + ctx->modify = true; } else { /* 1) Delete current head * 2) Clone correct snapshot into head @@ -2075,7 +2042,7 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) << " and rolling back to old snap" << dendl; _delete_head(ctx); - ctx->obs->exists = true; //we're about to recreate it + obs.exists = true; //we're about to recreate it map attrs; t.clone(coll, @@ -2091,7 +2058,7 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) oi.oloc, false); assert(clone_context); - ctx->obs->oi.size = clone_context->obs.oi.size; + obs.oi.size = clone_context->obs.oi.size; map >::iterator iter = ssc->snapset.clone_overlap.lower_bound(snapid); @@ -2125,10 +2092,11 @@ void ReplicatedPG::_make_clone(ObjectStore::Transaction& t, void ReplicatedPG::make_writeable(OpContext *ctx) { SnapSetContext *ssc = ctx->obc->ssc; - object_info_t& oi = ctx->obs->oi; + ObjectState& obs = ctx->new_obs; + object_info_t& oi = obs.oi; const sobject_t& soid = oi.soid; SnapContext& snapc = ctx->snapc; - ObjectStore::Transaction& t = ctx->op_t; + ObjectStore::Transaction t; // clone? assert(soid.snap == CEPH_NOSNAP); @@ -2184,13 +2152,13 @@ void ReplicatedPG::make_writeable(OpContext *ctx) info.stats.num_objects++; info.stats.num_object_clones++; ssc->snapset.clones.push_back(coid.snap); - ssc->snapset.clone_size[coid.snap] = ctx->obs->oi.size; + ssc->snapset.clone_size[coid.snap] = obs.oi.size; // clone_overlap should contain an entry for each clone // (an empty interval_set if there is no overlap) ssc->snapset.clone_overlap[coid.snap]; - if (ctx->obs->oi.size) - ssc->snapset.clone_overlap[coid.snap].insert(0, ctx->obs->oi.size); + if (obs.oi.size) + ssc->snapset.clone_overlap[coid.snap].insert(0, obs.oi.size); // log clone dout(10) << " cloning v " << oi.version @@ -2203,6 +2171,10 @@ void ReplicatedPG::make_writeable(OpContext *ctx) ctx->at_version.version++; } + // prepend transaction to op_t + t.append(ctx->op_t); + t.swap(ctx->op_t); + // update snapset with latest snap context ssc->snapset.seq = snapc.seq; ssc->snapset.snaps = snapc.snaps; @@ -2223,28 +2195,43 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) { assert(!ctx->ops.empty()); - object_info_t *poi = &ctx->obs->oi; - - const sobject_t& soid = poi->soid; + ObjectState& obs = ctx->new_obs; + const sobject_t& soid = obs.oi.soid; // we'll need this to log - eversion_t old_version = poi->version; + eversion_t old_version = obs.oi.version; - bool head_existed = ctx->obs->exists; + bool head_existed = obs.exists; // prepare the actual mutation int result = do_osd_ops(ctx, ctx->ops, ctx->outdata); - if (result < 0 || ctx->op_t.empty()) + if (result < 0 || + (ctx->op_t.empty() && !ctx->modify)) return result; // error, or read op. + // there was a modification. + + // valid snap context? + if (!ctx->snapc.is_valid()) + return -EINVAL; + + make_writeable(ctx); + + if (ctx->user_modify) { + /* update the user_version for any modify ops, except for the watch op */ + obs.oi.user_version = ctx->at_version; + } + + ctx->reply_version = ctx->new_obs.oi.user_version; + ctx->bytes_written = ctx->op_t.get_encoded_bytes(); // finish and log the op. - poi->version = ctx->at_version; - + obs.oi.version = ctx->at_version; + bufferlist bss; ::encode(ctx->obc->ssc->snapset, bss); - assert(ctx->obs->exists == ctx->obc->ssc->snapset.head_exists); + assert(obs.exists == ctx->obc->ssc->snapset.head_exists); // append to log int logopcode = Log::Entry::MODIFY; @@ -2253,19 +2240,19 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime)); - if (ctx->obs->exists) { - poi->version = ctx->at_version; - poi->prior_version = old_version; - poi->last_reqid = ctx->reqid; + if (obs.exists) { + obs.oi.version = ctx->at_version; + obs.oi.prior_version = old_version; + obs.oi.last_reqid = ctx->reqid; if (ctx->mtime != utime_t()) { - poi->mtime = ctx->mtime; - dout(10) << " set mtime to " << poi->mtime << dendl; + obs.oi.mtime = ctx->mtime; + dout(10) << " set mtime to " << obs.oi.mtime << dendl; } else { - dout(10) << " mtime unchanged at " << poi->mtime << dendl; + dout(10) << " mtime unchanged at " << obs.oi.mtime << dendl; } - bufferlist bv(sizeof(*poi)); - ::encode(*poi, bv); + bufferlist bv(sizeof(obs.oi)); + ::encode(obs.oi, bv); ctx->op_t.setattr(coll, soid, OI_ATTR, bv); dout(10) << " final snapset " << ctx->obc->ssc->snapset @@ -2275,7 +2262,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) // if we logically recreated the head, remove old _snapdir object sobject_t snapoid(soid.oid, CEPH_SNAPDIR); - ctx->snapset_obc = get_object_context(snapoid, poi->oloc, false); + ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, false); if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) { ctx->op_t.remove(coll, snapoid); dout(10) << " removing old " << snapoid << dendl; @@ -2297,20 +2284,23 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, old_version, osd_reqid_t(), ctx->mtime)); - ctx->snapset_obc = get_object_context(snapoid, poi->oloc, true); + ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, true); ctx->snapset_obc->obs.exists = true; ctx->snapset_obc->obs.oi.version = ctx->at_version; ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid; ctx->snapset_obc->obs.oi.mtime = ctx->mtime; assert(ctx->snapset_obc->registered); - bufferlist bv(sizeof(*poi)); + bufferlist bv(sizeof(obs.oi)); ::encode(ctx->snapset_obc->obs.oi, bv); ctx->op_t.touch(coll, snapoid); ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv); ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss); } + // apply new object state. + *ctx->obs = ctx->new_obs; + return result; } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index defb0b3d6faf1..53c3034f09a4a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -334,6 +334,9 @@ public: bufferlist outdata; ObjectState *obs; + ObjectState new_obs; // resulting ObjectState + bool modify; // (force) modification (even if op_t is empty) + bool user_modify; // user-visible modification uint64_t bytes_written; @@ -357,9 +360,10 @@ public: OpContext(Message *_op, osd_reqid_t _reqid, vector& _ops, ObjectState *_obs, ReplicatedPG *_pg) : - op(_op), reqid(_reqid), ops(_ops), obs(_obs), + op(_op), reqid(_reqid), ops(_ops), obs(_obs), new_obs(_obs->oi, _obs->exists), + modify(false), user_modify(false), bytes_written(0), - obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {} + obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { } ~OpContext() { assert(!clone_obc); if (reply) @@ -642,11 +646,6 @@ protected: int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr); int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr); - int prepare_call(MOSDOp *osd_op, ceph_osd_op& op, - string& cname, string& mname, - bufferlist::iterator& bp, - ClassHandler::ClassMethod **pmethod); - bool pgls_filter(PGLSFilter *filter, sobject_t& sobj, bufferlist& outdata); int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter); -- 2.39.5