]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move ObjectState side effects out of do_osd_ops
authorSage Weil <sage@newdream.net>
Tue, 26 Apr 2011 17:35:03 +0000 (10:35 -0700)
committerSage Weil <sage@newdream.net>
Tue, 26 Apr 2011 17:35:03 +0000 (10:35 -0700)
We want to be able to handle a failure mid-way through an OSDOp
transaction and bail out with no side effects.  This patch

 * puts an ObjectState new_obs in the OoContext that modifications go in
 * only applies if it the transaction is a success
 * only does make_writeable (at the end!) if the transaction is a success

There are still side effects with the watch/notify stuff, though.

Signed-off-by: Sage Weil <sage@newdream.net>
src/osd/ClassHandler.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index bb6ed627379a608438db248dcf12856fa0c62c67..56c6e95f78871970a587d98dc1595e2e6db90112 100644 (file)
@@ -24,6 +24,11 @@ public:
     int exec(cls_method_context_t ctx, bufferlist& indata, bufferlist& outdata);
     void unregister();
 
+    int get_flags() {
+      Mutex::Locker l(cls->handler->mutex);
+      return flags;
+    }
+
     ClassMethod() : cls(0), func(0), cxx_func(0) {}
   };
 
index b63bf0fa667ae124f323a03947f9480c820ed56c..a589d1afcf96893b5b6b84cc53d6812e4511eba6 100644 (file)
@@ -968,25 +968,6 @@ void ReplicatedPG::do_complete_notify(Watch::Notification *notif, ObjectContext
   osd->complete_notify((void *)notif, obc);
 }
 
-int ReplicatedPG::prepare_call(MOSDOp *osd_op, ceph_osd_op& op,
-                              string& cname, string& mname,
-                              bufferlist::iterator& bp,
-                              ClassHandler::ClassMethod **pmethod)
-{
-  ClassHandler::ClassData *cls;
-  int result = osd->class_handler->open_class(cname, &cls);
-  assert(result == 0);
-
-  bufferlist outdata;
-  ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
-  if (!method) {
-    dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
-    return -EINVAL;
-  }
-  *pmethod = method;
-  return 0;
-}
-
 // ========================================================================
 // low level osd ops
 
@@ -995,7 +976,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 {
   int result = 0;
   SnapSetContext *ssc = ctx->obc->ssc;
-  object_info_t& oi = ctx->obs->oi;
+  ObjectState& obs = ctx->new_obs;
+  object_info_t& oi = obs.oi;
 
   const sobject_t& soid = oi.soid;
 
@@ -1009,49 +991,21 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
     dout(10) << "do_osd_op  " << osd_op << dendl;
 
-    // modify?
-    int flags;
-    bool is_modify;
-    string cname, mname;
     bufferlist::iterator bp = osd_op.data.begin();
+
+    // user-visible modifcation?
     switch (op.op) {
-    case CEPH_OSD_OP_CALL:
-      bp.copy(op.cls.class_len, cname);
-      bp.copy(op.cls.method_len, mname);
-      {
-       ClassHandler::ClassData *cls;
-       int r = osd->class_handler->open_class(cname, &cls);
-       assert(r == 0);
-       flags = cls->get_method_flags(mname.c_str());
-      }
-      is_modify = flags & CLS_METHOD_WR;
-      dout(10) << " class " << cname << "." << mname << " flags " << flags << " is_modify " << is_modify << dendl;
+      // non user-visible modifications
+    case CEPH_OSD_OP_WATCH:
       break;
-
     default:
-      is_modify = (op.op & CEPH_OSD_OP_MODE_WR);
-      break;
+      if (op.op & CEPH_OSD_OP_MODE_WR)
+       ctx->user_modify = true;
     }
 
-    ctx->reply_version = oi.user_version;
-    // make writeable (i.e., clone if necessary)
-    if (is_modify) {
-      if (!ctx->snapc.is_valid())
-        return -EINVAL;
-      make_writeable(ctx);
-
-      if (op.op != CEPH_OSD_OP_WATCH) {
-        /* update the user_version for any modify ops, except for the watch op */
-        oi.user_version = ctx->at_version;
-        ctx->reply_version = oi.user_version;
-      }
-    }
-
-    dout(0) << "oi.user_version=" << oi.user_version << " is_modify=" << is_modify << dendl;
-
     // munge ZERO -> TRUNCATE?  (don't munge to DELETE or we risk hosing attributes)
     if (op.op == CEPH_OSD_OP_ZERO &&
-       ctx->obs->exists &&
+       obs.exists &&
        op.extent.offset + op.extent.length >= oi.size) {
       dout(10) << " munging ZERO " << op.extent.offset << "~" << op.extent.length
               << " -> TRUNCATE " << op.extent.offset << " (old size is " << oi.size << ")" << dendl;
@@ -1174,29 +1128,40 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
     case CEPH_OSD_OP_CALL:
       {
+       string cname, mname;
+       bp.copy(op.cls.class_len, cname);
+       bp.copy(op.cls.method_len, mname);
+
        bufferlist indata;
        bp.copy(op.cls.indata_len, indata);
 
-       ClassHandler::ClassMethod *method;
-        result = prepare_call((MOSDOp *)ctx->op, op, cname, mname, bp, &method);
-        if (result == -EAGAIN)
-          return result;
-
-        if (!result) {
-         bufferlist outdata;
+       ClassHandler::ClassData *cls;
+       int result = osd->class_handler->open_class(cname, &cls);
+       assert(result == 0);
 
-          dout(10) << "call method " << cname << "." << mname << dendl;
-         result = method->exec((cls_method_context_t)&ctx, indata, outdata);
-         dout(10) << "method called response length=" << outdata.length() << dendl;
-         op.extent.length = outdata.length();
-         odata.claim_append(outdata);
+       ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());
+       if (!method) {
+         dout(10) << "call method " << cname << "." << mname << " does not exist" << dendl;
+         result = -EINVAL;
+         break;
        }
+
+       int flags = method->get_flags();
+       if (flags & CLS_METHOD_WR)
+         ctx->user_modify = true;
+
+       bufferlist outdata;
+       dout(10) << "call method " << cname << "." << mname << dendl;
+       result = method->exec((cls_method_context_t)&ctx, indata, outdata);
+       dout(10) << "method called response length=" << outdata.length() << dendl;
+       op.extent.length = outdata.length();
+       odata.claim_append(outdata);
       }
       break;
 
     case CEPH_OSD_OP_STAT:
       {
-       if (ctx->obs->exists) {
+       if (obs.exists) {
          ::encode(oi.size, odata);
          ::encode(oi.mtime, odata);
          dout(10) << "stat oi has " << oi.size << " " << oi.mtime << dendl;
@@ -1467,7 +1432,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
       { // write full object
        bufferlist nbl;
        bp.copy(op.extent.length, nbl);
-       if (ctx->obs->exists)
+       if (obs.exists)
          t.truncate(coll, soid, 0);
        t.write(coll, soid, op.extent.offset, op.extent.length, nbl);
        if (ssc->snapset.clones.size()) {
@@ -1499,7 +1464,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
     case CEPH_OSD_OP_ZERO:
       { // zero
        assert(op.extent.length);
-       if (ctx->obs->exists) {
+       if (obs.exists) {
          t.zero(coll, soid, op.extent.offset, op.extent.length);
          if (ssc->snapset.clones.size()) {
            snapid_t newest = *ssc->snapset.clones.rbegin();
@@ -1519,7 +1484,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
     case CEPH_OSD_OP_CREATE:
       { // zero
         int flags = le32_to_cpu(op.flags);
-       if (ctx->obs->exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
+       if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
           result = -EEXIST; /* this is an exclusive create */
         else {
           t.touch(coll, soid);
@@ -1534,7 +1499,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
     case CEPH_OSD_OP_TRUNCATE:
       { // truncate
-       if (!ctx->obs->exists) {
+       if (!obs.exists) {
          dout(10) << " object dne, truncate is a no-op" << dendl;
          break;
        }
@@ -1679,14 +1644,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
       
     case CEPH_OSD_OP_SETXATTR:
       {
-       if (!ctx->obs->exists)
+       if (!obs.exists)
          t.touch(coll, soid);
        string aname;
        bp.copy(op.xattr.name_len, aname);
        string name = "_" + aname;
        bufferlist bl;
        bp.copy(op.xattr.value_len, bl);
-       if (!ctx->obs->exists)  // create object if it doesn't yet exist.
+       if (!obs.exists)  // create object if it doesn't yet exist.
          t.touch(coll, soid);
        t.setattr(coll, soid, name, bl);
        ssc->snapset.head_exists = true;
@@ -1992,11 +1957,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
       result = -EOPNOTSUPP;
     }
 
-    if ((is_modify) &&
-       !ctx->obs->exists && ssc->snapset.head_exists) {
+    if (!obs.exists && ssc->snapset.head_exists) {
       dout(20) << " num_objects " << info.stats.num_objects << " -> " << (info.stats.num_objects+1) << dendl;
       info.stats.num_objects++;
-      ctx->obs->exists = true;
+      obs.exists = true;
     }
 
     if (result)
@@ -2008,11 +1972,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 inline void ReplicatedPG::_delete_head(OpContext *ctx)
 {
   SnapSetContext *ssc = ctx->obc->ssc;
-  object_info_t& oi = ctx->obs->oi;
+  ObjectState& obs = ctx->new_obs;
+  object_info_t& oi = obs.oi;
   const sobject_t& soid = oi.soid;
   ObjectStore::Transaction& t = ctx->op_t;
 
-  if (ctx->obs->exists)
+  if (obs.exists)
     t.remove(coll, soid);
   if (ssc->snapset.clones.size()) {
     snapid_t newest = *ssc->snapset.clones.rbegin();
@@ -2023,13 +1988,13 @@ inline void ReplicatedPG::_delete_head(OpContext *ctx)
     ssc->snapset.clone_overlap.erase(newest);  // ok, redundant.
     ssc->snapset.clone_overlap[newest];
   }
-  if (ctx->obs->exists) {
+  if (obs.exists) {
     info.stats.num_objects--;
     info.stats.num_bytes -= oi.size;
     info.stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10);
     oi.size = 0;
     ssc->snapset.head_exists = false;
-    ctx->obs->exists = false;
+    obs.exists = false;
   }      
   info.stats.num_wr++;
 }
@@ -2037,7 +2002,8 @@ inline void ReplicatedPG::_delete_head(OpContext *ctx)
 void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
 {
   SnapSetContext *ssc = ctx->obc->ssc;
-  object_info_t& oi = ctx->obs->oi;
+  ObjectState& obs = ctx->new_obs;
+  object_info_t& oi = obs.oi;
   const sobject_t& soid = oi.soid;
   ObjectStore::Transaction& t = ctx->op_t;
   snapid_t snapid = (uint64_t)op.snap.snapid;
@@ -2064,8 +2030,9 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
     }
   } else { //we got our context, let's use it to do the rollback!
     sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid;
-    if (ctx->clone_obc && *ctx->clone_obc->obs.oi.snaps.rbegin() <= snapid) {
-      //just cloned the rollback target, we don't need to do anything!
+    if (rollback_to->obs.oi.soid.snap == CEPH_NOSNAP) {
+      // rolling back to the head; we just need to clone it.
+      ctx->modify = true;
     } else {
       /* 1) Delete current head
        * 2) Clone correct snapshot into head
@@ -2075,7 +2042,7 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
               << " and rolling back to old snap" << dendl;
       
       _delete_head(ctx);
-      ctx->obs->exists = true; //we're about to recreate it
+      obs.exists = true; //we're about to recreate it
       
       map<string, bufferptr> attrs;
       t.clone(coll,
@@ -2091,7 +2058,7 @@ void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
                                                        oi.oloc,
                                                        false);
       assert(clone_context);
-      ctx->obs->oi.size = clone_context->obs.oi.size;
+      obs.oi.size = clone_context->obs.oi.size;
 
       map<snapid_t, interval_set<uint64_t> >::iterator iter =
        ssc->snapset.clone_overlap.lower_bound(snapid);
@@ -2125,10 +2092,11 @@ void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
 void ReplicatedPG::make_writeable(OpContext *ctx)
 {
   SnapSetContext *ssc = ctx->obc->ssc;
-  object_info_t& oi = ctx->obs->oi;
+  ObjectState& obs = ctx->new_obs;
+  object_info_t& oi = obs.oi;
   const sobject_t& soid = oi.soid;
   SnapContext& snapc = ctx->snapc;
-  ObjectStore::Transaction& t = ctx->op_t;
+  ObjectStore::Transaction t;
 
   // clone?
   assert(soid.snap == CEPH_NOSNAP);
@@ -2184,13 +2152,13 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
     info.stats.num_objects++;
     info.stats.num_object_clones++;
     ssc->snapset.clones.push_back(coid.snap);
-    ssc->snapset.clone_size[coid.snap] = ctx->obs->oi.size;
+    ssc->snapset.clone_size[coid.snap] = obs.oi.size;
 
     // clone_overlap should contain an entry for each clone 
     // (an empty interval_set if there is no overlap)
     ssc->snapset.clone_overlap[coid.snap];
-    if (ctx->obs->oi.size)
-      ssc->snapset.clone_overlap[coid.snap].insert(0, ctx->obs->oi.size);
+    if (obs.oi.size)
+      ssc->snapset.clone_overlap[coid.snap].insert(0, obs.oi.size);
     
     // log clone
     dout(10) << " cloning v " << oi.version
@@ -2203,6 +2171,10 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
     ctx->at_version.version++;
   }
   
+  // prepend transaction to op_t
+  t.append(ctx->op_t);
+  t.swap(ctx->op_t);
+
   // update snapset with latest snap context
   ssc->snapset.seq = snapc.seq;
   ssc->snapset.snaps = snapc.snaps;
@@ -2223,28 +2195,43 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
 {
   assert(!ctx->ops.empty());
   
-  object_info_t *poi = &ctx->obs->oi;
-
-  const sobject_t& soid = poi->soid;
+  ObjectState& obs = ctx->new_obs;
+  const sobject_t& soid = obs.oi.soid;
 
   // we'll need this to log
-  eversion_t old_version = poi->version;
+  eversion_t old_version = obs.oi.version;
 
-  bool head_existed = ctx->obs->exists;
+  bool head_existed = obs.exists;
 
   // prepare the actual mutation
   int result = do_osd_ops(ctx, ctx->ops, ctx->outdata);
-  if (result < 0 || ctx->op_t.empty())
+  if (result < 0 ||
+      (ctx->op_t.empty() && !ctx->modify))
     return result;  // error, or read op.
 
+  // there was a modification.
+
+  // valid snap context?
+  if (!ctx->snapc.is_valid())
+    return -EINVAL;
+
+  make_writeable(ctx);
+
+  if (ctx->user_modify) {
+    /* update the user_version for any modify ops, except for the watch op */
+    obs.oi.user_version = ctx->at_version;
+  }
+  
+  ctx->reply_version = ctx->new_obs.oi.user_version;
+
   ctx->bytes_written = ctx->op_t.get_encoded_bytes();
 
   // finish and log the op.
-  poi->version = ctx->at_version;
-  
+  obs.oi.version = ctx->at_version;
   bufferlist bss;
   ::encode(ctx->obc->ssc->snapset, bss);
-  assert(ctx->obs->exists == ctx->obc->ssc->snapset.head_exists);
+  assert(obs.exists == ctx->obc->ssc->snapset.head_exists);
 
   // append to log
   int logopcode = Log::Entry::MODIFY;
@@ -2253,19 +2240,19 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version,
                                ctx->reqid, ctx->mtime));
 
-  if (ctx->obs->exists) {
-    poi->version = ctx->at_version;
-    poi->prior_version = old_version;
-    poi->last_reqid = ctx->reqid;
+  if (obs.exists) {
+    obs.oi.version = ctx->at_version;
+    obs.oi.prior_version = old_version;
+    obs.oi.last_reqid = ctx->reqid;
     if (ctx->mtime != utime_t()) {
-      poi->mtime = ctx->mtime;
-      dout(10) << " set mtime to " << poi->mtime << dendl;
+      obs.oi.mtime = ctx->mtime;
+      dout(10) << " set mtime to " << obs.oi.mtime << dendl;
     } else {
-      dout(10) << " mtime unchanged at " << poi->mtime << dendl;
+      dout(10) << " mtime unchanged at " << obs.oi.mtime << dendl;
     }
 
-    bufferlist bv(sizeof(*poi));
-    ::encode(*poi, bv);
+    bufferlist bv(sizeof(obs.oi));
+    ::encode(obs.oi, bv);
     ctx->op_t.setattr(coll, soid, OI_ATTR, bv);
 
     dout(10) << " final snapset " << ctx->obc->ssc->snapset
@@ -2275,7 +2262,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
       // if we logically recreated the head, remove old _snapdir object
       sobject_t snapoid(soid.oid, CEPH_SNAPDIR);
 
-      ctx->snapset_obc = get_object_context(snapoid, poi->oloc, false);
+      ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, false);
       if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
        ctx->op_t.remove(coll, snapoid);
        dout(10) << " removing old " << snapoid << dendl;
@@ -2297,20 +2284,23 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, old_version,
                                  osd_reqid_t(), ctx->mtime));
 
-    ctx->snapset_obc = get_object_context(snapoid, poi->oloc, true);
+    ctx->snapset_obc = get_object_context(snapoid, obs.oi.oloc, true);
     ctx->snapset_obc->obs.exists = true;
     ctx->snapset_obc->obs.oi.version = ctx->at_version;
     ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
     ctx->snapset_obc->obs.oi.mtime = ctx->mtime;
     assert(ctx->snapset_obc->registered);
 
-    bufferlist bv(sizeof(*poi));
+    bufferlist bv(sizeof(obs.oi));
     ::encode(ctx->snapset_obc->obs.oi, bv);
     ctx->op_t.touch(coll, snapoid);
     ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv);
     ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss);
   }
 
+  // apply new object state.
+  *ctx->obs = ctx->new_obs;
+
   return result;
 }
 
index defb0b3d6faf1f9acce60c7ae7fc041f6d2ad792..53c3034f09a4a4cdb2a352042083bea4f823ffd7 100644 (file)
@@ -334,6 +334,9 @@ public:
     bufferlist outdata;
 
     ObjectState *obs;
+    ObjectState new_obs;  // resulting ObjectState
+    bool modify;          // (force) modification (even if op_t is empty)
+    bool user_modify;     // user-visible modification
 
     uint64_t bytes_written;
 
@@ -357,9 +360,10 @@ public:
 
     OpContext(Message *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
              ObjectState *_obs, ReplicatedPG *_pg) :
-      op(_op), reqid(_reqid), ops(_ops), obs(_obs),
+      op(_op), reqid(_reqid), ops(_ops), obs(_obs), new_obs(_obs->oi, _obs->exists),
+      modify(false), user_modify(false),
       bytes_written(0),
-      obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {}
+      obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { }
     ~OpContext() {
       assert(!clone_obc);
       if (reply)
@@ -642,11 +646,6 @@ protected:
   int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
   int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);
 
-  int prepare_call(MOSDOp *osd_op, ceph_osd_op& op,
-                  string& cname, string& mname,
-                  bufferlist::iterator& bp,
-                  ClassHandler::ClassMethod **pmethod);
-
   bool pgls_filter(PGLSFilter *filter, sobject_t& sobj, bufferlist& outdata);
   int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);