]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: Fill in rollback info for log events
authorSamuel Just <sam.just@inktank.com>
Wed, 4 Dec 2013 00:14:55 +0000 (16:14 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 22 Jan 2014 22:39:16 +0000 (14:39 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index 643b70261d1b1b1e2de1526d478c729455058029..b1f05df6cf3ff62eba6e2126c7b830f0966f2fde 100644 (file)
@@ -2144,7 +2144,6 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     // remove clone
     dout(10) << coid << " snaps " << old_snaps << " -> "
             << new_snaps << " ... deleting" << dendl;
-    t->remove(coid);
 
     // ...from snapset
     snapid_t last = coid.snap;
@@ -2193,6 +2192,20 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
        osd_reqid_t(),
        ctx->mtime)
       );
+    if (pool.info.ec_pool()) {
+      set<snapid_t> snaps(
+       ctx->obc->obs.oi.snaps.begin(),
+       ctx->obc->obs.oi.snaps.end());
+      ctx->log.back().mod_desc.update_snaps(snaps);
+      if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
+       t->stash(coid, ctx->at_version.version);
+      } else {
+       t->remove(coid);
+      }
+    } else {
+      t->remove(coid);
+      ctx->log.back().mod_desc.mark_unrollbackable();
+    }
     ctx->at_version.version++;
   } else {
     // save adjusted snaps for this object
@@ -2216,6 +2229,18 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
        osd_reqid_t(),
        ctx->mtime)
       );
+    if (pool.info.ec_pool()) {
+      set<string> changing;
+      changing.insert(OI_ATTR);
+      ctx->obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
+      set<snapid_t> snaps(
+       ctx->obc->obs.oi.snaps.begin(),
+       ctx->obc->obs.oi.snaps.end());
+      ctx->log.back().mod_desc.update_snaps(snaps);
+    } else {
+      ctx->log.back().mod_desc.mark_unrollbackable();
+    }
+    
     ::encode(coi.snaps, ctx->log.back().snaps);
     ctx->at_version.version++;
   }
@@ -2241,9 +2266,19 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
        osd_reqid_t(),
        ctx->mtime)
       );
-    ctx->snapset_obc->obs.exists = false;
 
-    t->remove(snapoid);
+    ctx->snapset_obc->obs.exists = false;
+    
+    if (pool.info.ec_pool()) {
+      if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
+       t->stash(snapoid, ctx->at_version.version);
+      } else {
+       t->remove(snapoid);
+      }
+    } else {
+      t->remove(snapoid);
+      ctx->log.back().mod_desc.mark_unrollbackable();
+    }
   } else {
     dout(10) << coid << " updating snapset on " << snapoid << dendl;
     ctx->log.push_back(
@@ -2268,6 +2303,15 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     bl.clear();
     ::encode(ctx->snapset_obc->obs.oi, bl);
     setattr_maybe_cache(ctx->snapset_obc, ctx, t, OI_ATTR, bl);
+
+    if (pool.info.ec_pool()) {
+      set<string> changing;
+      changing.insert(OI_ATTR);
+      changing.insert(SS_ATTR);
+      ctx->snapset_obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
+    } else {
+      ctx->log.back().mod_desc.mark_unrollbackable();
+    }
   }
 
   return repop;
@@ -2795,6 +2839,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_MAPEXT:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        // read into a buffer
@@ -2811,6 +2859,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_SPARSE_READ:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
         if (op.extent.truncate_seq) {
@@ -3380,6 +3432,19 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EINVAL;
          break;
        }
+
+       if (!obs.exists) {
+         ctx->mod_desc.create();
+       } else if (op.extent.offset == oi.size) {
+         ctx->mod_desc.append(oi.size);
+       } else {
+         ctx->mod_desc.mark_unrollbackable();
+         if (pool.info.ec_pool()) {
+           result = -EOPNOTSUPP;
+           break;
+         }
+       }
+
         __u32 seq = oi.truncate_seq;
         if (seq && (seq > op.extent.truncate_seq) &&
             (op.extent.offset + op.extent.length > oi.size)) {
@@ -3434,13 +3499,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
        if (result < 0)
          break;
-       if (obs.exists) {
-         t->truncate(soid, 0);
+
+       if (pool.info.ec_pool()) {
+         if (obs.exists) {
+           if (ctx->mod_desc.rmobject(oi.version.version)) {
+             t->stash(soid, oi.version.version);
+           } else {
+             t->remove(soid);
+           }
+         }
+         ctx->mod_desc.create();
+         t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
+         if (obs.exists) {
+           t->setattrs(soid, ctx->obc->attr_cache);
+         }
        } else {
+         ctx->mod_desc.mark_unrollbackable();
+         if (obs.exists) {
+           t->truncate(soid, 0);
+         }
+         t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
+       }
+       if (!obs.exists) {
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
-       t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
        interval_set<uint64_t> ch;
        if (oi.size > 0)
          ch.insert(0, oi.size);
@@ -3461,6 +3544,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_ZERO:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_write;
       { // zero
        result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
@@ -3468,6 +3555,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        assert(op.extent.length);
        if (obs.exists && !oi.is_whiteout()) {
+         ctx->mod_desc.mark_unrollbackable();
          t->zero(soid, op.extent.offset, op.extent.length);
          interval_set<uint64_t> ch;
          ch.insert(op.extent.offset, op.extent.length);
@@ -3506,6 +3594,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            }
          }
          if (result >= 0 && !obs.exists) {
+           ctx->mod_desc.create();
            t->touch(soid);
            ctx->delta_stats.num_objects++;
            obs.exists = true;
@@ -3519,7 +3608,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // falling through
 
     case CEPH_OSD_OP_TRUNCATE:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_write;
+      ctx->mod_desc.mark_unrollbackable();
       {
        // truncate
        if (!obs.exists || oi.is_whiteout()) {
@@ -3572,6 +3666,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_CLONERANGE:
+      ctx->mod_desc.mark_unrollbackable();
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       ++ctx->num_write;
       {
@@ -3650,6 +3749,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        if (!obs.exists) {
+         ctx->mod_desc.create();
          t->touch(soid);
          ctx->delta_stats.num_objects++;
          obs.exists = true;
@@ -3657,6 +3757,19 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        string aname;
        bp.copy(op.xattr.name_len, aname);
        string name = "_" + aname;
+       if (pool.info.ec_pool()) {
+         map<string, boost::optional<bufferlist> > to_set;
+         bufferlist old;
+         int r = getattr_maybe_cache(ctx->obc, name, &old);
+         if (r == 0) {
+           to_set[name] = old;
+         } else {
+           to_set[name];
+         }
+         ctx->mod_desc.setattrs(to_set);
+       } else {
+         ctx->mod_desc.mark_unrollbackable();
+       }
        bufferlist bl;
        bp.copy(op.xattr.value_len, bl);
        setattr_maybe_cache(ctx->obc, ctx, t, name, bl);
@@ -3670,6 +3783,19 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        string aname;
        bp.copy(op.xattr.name_len, aname);
        string name = "_" + aname;
+       if (pool.info.ec_pool()) {
+         map<string, boost::optional<bufferlist> > to_set;
+         bufferlist old;
+         int r = getattr_maybe_cache(ctx->obc, name, &old);
+         if (r == 0) {
+           to_set[name] = old;
+         } else {
+           to_set[name];
+         }
+         ctx->mod_desc.setattrs(to_set);
+       } else {
+         ctx->mod_desc.mark_unrollbackable();
+       }
        rmattr_maybe_cache(ctx->obc, ctx, t, name);
        ctx->delta_stats.num_wr++;
       }
@@ -3780,6 +3906,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
       // OMAP Read ops
     case CEPH_OSD_OP_OMAPGETKEYS:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        string start_after;
@@ -3813,6 +3943,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPGETVALS:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        string start_after;
@@ -3854,6 +3988,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPGETHEADER:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        osd->store->omap_get_header(coll, soid, &osd_op.outdata);
@@ -3863,6 +4001,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        set<string> keys_to_get;
@@ -3882,6 +4024,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAP_CMP:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
       ++ctx->num_read;
       {
        if (!obs.exists || oi.is_whiteout()) {
@@ -3949,6 +4095,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
       // OMAP Write ops
     case CEPH_OSD_OP_OMAPSETVALS:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
+      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists) {
@@ -3976,6 +4127,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPSETHEADER:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
+      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists) {
@@ -3989,6 +4145,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPCLEAR:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
+      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists || oi.is_whiteout()) {
@@ -4002,6 +4163,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_OMAPRMKEYS:
+      if (pool.info.ec_pool()) {
+       result = -EOPNOTSUPP;
+       break;
+      }
+      ctx->mod_desc.mark_unrollbackable();
       ++ctx->num_write;
       {
        if (!obs.exists || oi.is_whiteout()) {
@@ -4131,8 +4297,17 @@ inline int ReplicatedPG::_delete_head(OpContext *ctx, bool no_whiteout)
 
   if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout))
     return -ENOENT;
-  
-  t->remove(soid);
+
+  if (pool.info.ec_pool()) {
+    if (ctx->mod_desc.rmobject(oi.version.version)) {
+      t->stash(soid, oi.version.version);
+    } else {
+      t->remove(soid);
+    }
+  } else {
+    ctx->mod_desc.mark_unrollbackable();
+    t->remove(soid);
+  }
 
   if (oi.size > 0) {
     interval_set<uint64_t> ch;
@@ -4227,9 +4402,21 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       dout(10) << "_rollback_to deleting " << soid.oid
               << " and rolling back to old snap" << dendl;
 
-      if (obs.exists)
-       t->remove(soid);
-      
+      if (pool.info.ec_pool()) {
+       if (obs.exists) {
+         if (ctx->mod_desc.rmobject(oi.version.version)) {
+           t->stash(soid, oi.version.version);
+         } else {
+           t->remove(soid);
+         }
+       }
+      } else {
+       if (obs.exists) {
+         ctx->mod_desc.mark_unrollbackable();
+         t->remove(soid);
+       }
+      }
+      ctx->mod_desc.create();
       t->clone(rollback_to_sobject, soid);
       snapset.head_exists = true;
 
@@ -4377,6 +4564,7 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
                                      ctx->obs->oi.user_version,
                                      osd_reqid_t(), ctx->new_obs.oi.mtime));
     ::encode(snaps, ctx->log.back().snaps);
+    ctx->log.back().mod_desc.create();
 
     ctx->at_version.version++;
   }
@@ -4606,13 +4794,22 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
 
        ctx->snapset_obc = get_object_context(snapoid, false);
        if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
-         ctx->op_t->remove(snapoid);
+         ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid,
+             ctx->at_version,
+             ctx->obs->oi.version,
+             0, osd_reqid_t(), ctx->mtime));
+         if (pool.info.ec_pool()) {
+           if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
+             ctx->op_t->stash(snapoid, ctx->at_version.version);
+           } else {
+             ctx->op_t->remove(snapoid);
+           }
+         } else {
+           ctx->op_t->remove(snapoid);
+           ctx->log.back().mod_desc.mark_unrollbackable();
+         }
          dout(10) << " removing old " << snapoid << dendl;
 
-         ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid,
-                                           ctx->at_version,
-                                           ctx->obs->oi.version,
-                                           0, osd_reqid_t(), ctx->mtime));
          ctx->at_version.version++;
 
          ctx->snapset_obc->obs.exists = false;
@@ -4630,6 +4827,11 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
                                        0, osd_reqid_t(), ctx->mtime));
 
       ctx->snapset_obc = get_object_context(snapoid, true);
+      if (pool.info.ec_pool() && !ctx->snapset_obc->obs.exists) {
+       ctx->log.back().mod_desc.create();
+      } else if (!pool.info.ec_pool()) {
+       ctx->log.back().mod_desc.mark_unrollbackable();
+      }
       ctx->snapset_obc->obs.exists = true;
       ctx->snapset_obc->obs.oi.version = ctx->at_version;
       ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
@@ -4640,6 +4842,14 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
       ctx->op_t->touch(snapoid);
       setattr_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t, OI_ATTR, bv);
       setattr_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t, SS_ATTR, bss);
+      if (pool.info.ec_pool()) {
+       map<string, boost::optional<bufferlist> > to_set;
+       to_set[SS_ATTR];
+       to_set[OI_ATTR];
+       ctx->log.back().mod_desc.setattrs(to_set);
+      } else {
+       ctx->log.back().mod_desc.mark_unrollbackable();
+      }
       ctx->at_version.version++;
     }
   }
@@ -4678,6 +4888,13 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
       dout(10) << " final snapset " << ctx->new_snapset
               << " in " << soid << dendl;
       setattr_maybe_cache(ctx->obc, ctx, ctx->op_t, SS_ATTR, bss);
+
+      if (pool.info.ec_pool()) {
+       set<string> changing;
+       changing.insert(OI_ATTR);
+       changing.insert(SS_ATTR);
+       ctx->obc->fill_in_setattrs(changing, &(ctx->mod_desc));
+      }
     } else {
       dout(10) << " no snapset (this is a clone)" << dendl;
     }
@@ -4708,6 +4925,8 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
     }
   }
 
+  ctx->log.back().mod_desc.claim(ctx->mod_desc);
+
   // apply new object state.
   ctx->obc->obs = ctx->new_obs;
   ctx->obc->ssc->snapset = ctx->new_snapset;
@@ -5845,6 +6064,15 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
 
   repop->ctx->apply_pending_attrs();
 
+  if (pool.info.ec_pool()) {
+    for (vector<pg_log_entry_t>::iterator i = repop->ctx->log.begin();
+        i != repop->ctx->log.end();
+        ++i) {
+      assert(i->mod_desc.can_rollback());
+      assert(!i->mod_desc.empty());
+    }
+  }
+
   Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
   Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
   Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
@@ -6138,6 +6366,14 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   ::encode(obc->obs.oi, bl);
   setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
 
+  if (pool.info.ec_pool()) {
+    map<string, boost::optional<bufferlist> > to_set;
+    to_set[OI_ATTR] = bl;
+    ctx->log.back().mod_desc.setattrs(to_set);
+  } else {
+    ctx->log.back().mod_desc.mark_unrollbackable();
+  }
+
   // obc ref swallowed by repop!
   issue_repop(repop, repop->ctx->mtime);
   eval_repop(repop);
index 6069a0506ece70d9c1c8a8a4cdacee55ae7fbe70..e3d23a388516f3ea519cc3e00f45bf8d53866f35 100644 (file)
@@ -449,6 +449,8 @@ public:
       pending_attrs.clear();
     }
 
+    ObjectModDesc mod_desc;
+
     enum { W_LOCK, R_LOCK, NONE } lock_to_release;
 
     OpContext(const OpContext& other);
index 750640679b5c679e6ab4cdd15fe19490eff4e116..303e328aa4deb5af8a65b45e79df68b41b3ca748 100644 (file)
@@ -2628,6 +2628,21 @@ public:
 
   // attr cache
   map<string, bufferlist> attr_cache;
+
+  void fill_in_setattrs(const set<string> &changing, ObjectModDesc *mod) {
+    map<string, boost::optional<bufferlist> > to_set;
+    for (set<string>::const_iterator i = changing.begin();
+        i != changing.end();
+        ++i) {
+      map<string, bufferlist>::iterator iter = attr_cache.find(*i);
+      if (iter != attr_cache.end()) {
+       to_set[*i] = iter->second;
+      } else {
+       to_set[*i];
+      }
+    }
+    mod->setattrs(to_set);
+  }
 };
 
 inline ostream& operator<<(ostream& out, const ObjectState& obs)