]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: MASKTRUNC and SETTRUNC osd ops
authorSage Weil <sage@newdream.net>
Thu, 29 Jan 2009 18:58:50 +0000 (10:58 -0800)
committerSage Weil <sage@newdream.net>
Fri, 30 Jan 2009 19:44:33 +0000 (11:44 -0800)
src/include/ceph_fs.h
src/include/types.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index 55329235bdec4ab1b6daa12194d6f53a17c07e5d..c5942e9440a3c69eea295cc2d274b9fb47afafea 100644 (file)
@@ -1155,47 +1155,53 @@ struct ceph_mds_snap_realm {
 #define CEPH_OSD_OP_TYPE_ATTR  0x0300
 
 enum {
+       /** data **/
        /* read */
        CEPH_OSD_OP_READ       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
        CEPH_OSD_OP_STAT       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
 
-       CEPH_OSD_OP_GETXATTR   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
-       CEPH_OSD_OP_GETXATTRS  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
-
-       /* subop */
-       CEPH_OSD_OP_PULL           = CEPH_OSD_OP_MODE_SUB | 1,
-       CEPH_OSD_OP_PUSH           = CEPH_OSD_OP_MODE_SUB | 2,
-       CEPH_OSD_OP_BALANCEREADS   = CEPH_OSD_OP_MODE_SUB | 3,
-       CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
-       CEPH_OSD_OP_SCRUB          = CEPH_OSD_OP_MODE_SUB | 5,
+       /* fancy read */
+       CEPH_OSD_OP_GREP       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
+       CEPH_OSD_OP_MASKTRUNC  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
 
-       /* object data */
+       /* write */
        CEPH_OSD_OP_WRITE      = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
        CEPH_OSD_OP_WRITEFULL  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
        CEPH_OSD_OP_TRUNCATE   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
        CEPH_OSD_OP_ZERO       = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
        CEPH_OSD_OP_DELETE     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
 
-       /* object attrs */
+       /* fancy write */
+       CEPH_OSD_OP_APPEND     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
+       CEPH_OSD_OP_STARTSYNC  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
+       CEPH_OSD_OP_SETTRUNC   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 8,
+       CEPH_OSD_OP_TRIMTRUNC  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 9,
+
+       /** attrs **/
+       /* read */
+       CEPH_OSD_OP_GETXATTR   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
+       CEPH_OSD_OP_GETXATTRS  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
+
+       /* write */
        CEPH_OSD_OP_SETXATTR   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
        CEPH_OSD_OP_SETXATTRS  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
        CEPH_OSD_OP_RESETXATTRS= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 3,
        CEPH_OSD_OP_RMXATTR    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
 
-       /* lock */
+       /** subop **/
+       CEPH_OSD_OP_PULL           = CEPH_OSD_OP_MODE_SUB | 1,
+       CEPH_OSD_OP_PUSH           = CEPH_OSD_OP_MODE_SUB | 2,
+       CEPH_OSD_OP_BALANCEREADS   = CEPH_OSD_OP_MODE_SUB | 3,
+       CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+       CEPH_OSD_OP_SCRUB          = CEPH_OSD_OP_MODE_SUB | 5,
+
+       /** lock **/
        CEPH_OSD_OP_WRLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
        CEPH_OSD_OP_WRUNLOCK   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
        CEPH_OSD_OP_RDLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
        CEPH_OSD_OP_RDUNLOCK   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
        CEPH_OSD_OP_UPLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
        CEPH_OSD_OP_DNLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
-
-       /* fancy read */
-       CEPH_OSD_OP_GREP       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
-
-       /* fancy write */
-       CEPH_OSD_OP_APPEND     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
-       CEPH_OSD_OP_STARTSYNC  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
 };
 
 static inline int ceph_osd_op_type_lock(int op)
@@ -1230,33 +1236,39 @@ static inline const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_READ: return "read";
        case CEPH_OSD_OP_STAT: return "stat";
 
+       case CEPH_OSD_OP_GREP: return "grep";
+       case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
+
        case CEPH_OSD_OP_WRITE: return "write";
        case CEPH_OSD_OP_DELETE: return "delete";
        case CEPH_OSD_OP_TRUNCATE: return "truncate";
        case CEPH_OSD_OP_ZERO: return "zero";
        case CEPH_OSD_OP_WRITEFULL: return "writefull";
 
+       case CEPH_OSD_OP_APPEND: return "append";
+       case CEPH_OSD_OP_STARTSYNC: return "startsync";
+       case CEPH_OSD_OP_SETTRUNC: return "settrunc";
+       case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
+
+       case CEPH_OSD_OP_GETXATTR: return "getxattr";
+       case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
        case CEPH_OSD_OP_SETXATTR: return "setxattr";
        case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
        case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
        case CEPH_OSD_OP_RMXATTR: return "rmxattr";
 
-       case CEPH_OSD_OP_WRLOCK: return "wrlock";
-       case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
-       case CEPH_OSD_OP_RDLOCK: return "rdlock";
-       case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
-       case CEPH_OSD_OP_UPLOCK: return "uplock";
-       case CEPH_OSD_OP_DNLOCK: return "dnlock";
-
        case CEPH_OSD_OP_PULL: return "pull";
        case CEPH_OSD_OP_PUSH: return "push";
        case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
        case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
        case CEPH_OSD_OP_SCRUB: return "scrub";
 
-       case CEPH_OSD_OP_GREP: return "grep";
-       case CEPH_OSD_OP_APPEND: return "append";
-       case CEPH_OSD_OP_STARTSYNC: return "startsync";
+       case CEPH_OSD_OP_WRLOCK: return "wrlock";
+       case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
+       case CEPH_OSD_OP_RDLOCK: return "rdlock";
+       case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
+       case CEPH_OSD_OP_UPLOCK: return "uplock";
+       case CEPH_OSD_OP_DNLOCK: return "dnlock";
 
        default: return "???";
        }
@@ -1286,12 +1298,15 @@ struct ceph_osd_op {
        union {
                struct {
                        __le64 offset, length;
-                       __le32 seq;
                };
                struct {
                        __le32 name_len;
                        __le32 value_len;
                };
+               struct {
+                       __le64 truncate_size;
+                       __le32 truncate_seq;
+               };
        };
 } __attribute__ ((packed));
 
index eed004fa66ca92cca48508897680083b661c0eda..c6664cabef24615cac2d1e3d4c9196855f7e80c7 100644 (file)
@@ -380,9 +380,14 @@ inline ostream& operator<<(ostream& out, const ceph_fsid_t& f) {
 
 inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
   out << ceph_osd_op_name(op.op);
-  if (ceph_osd_op_type_data(op.op))
-    out << " " << op.offset << "~" << op.length;
-  else if (ceph_osd_op_type_attr(op.op))
+  if (ceph_osd_op_type_data(op.op)) {
+    if (op.op == CEPH_OSD_OP_SETTRUNC ||
+       op.op == CEPH_OSD_OP_MASKTRUNC ||
+       op.op == CEPH_OSD_OP_TRIMTRUNC)
+      out << " " << op.truncate_seq << "@" << op.truncate_size;      
+    else
+      out << " " << op.offset << "~" << op.length;
+  } else if (ceph_osd_op_type_attr(op.op))
     out << " " << op.name_len << "+" << op.value_len;
   return out;
 }
index 3d2ff1d830517ec2cfc28d78ca2b3599682a8e3b..1c1e73c5ef0fd6746b53f02b92ead24c917f5930 100644 (file)
@@ -755,6 +755,69 @@ void ReplicatedPG::op_read(MOSDOp *op)
       }
       break;
       
+    case CEPH_OSD_OP_MASKTRUNC:
+      if (p != op->ops.begin()) {
+       ceph_osd_op& rd = *(p - 1);
+       ceph_osd_op& m = *p;
+
+       // are we beyond truncate_size?
+       if (rd.offset + rd.length > m.truncate_size) {  
+         __u32 seq;
+         interval_set<__u64> tm;
+         bufferlist::iterator p = oi.truncate_info.begin();
+         ::decode(seq, p);
+         ::decode(tm, p);
+
+         // truncated portion of the read
+         unsigned from = MAX(rd.offset, m.truncate_size);  // also end of data
+         unsigned to = rd.offset + rd.length;
+         unsigned trim = to-from;
+
+         rd.length = rd.length - trim;
+
+         dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
+
+         bufferlist keep;
+         keep.substr_of(data, 0, data.length() - trim);
+         bufferlist truncated;  // everthing after 'from'
+         truncated.substr_of(data, data.length() - trim, trim);
+         keep.swap(data);
+
+         if (seq == rd.truncate_seq) {
+           // keep any valid extents beyond 'from'
+           unsigned data_end = from;
+           for (map<__u64,__u64>::iterator q = tm.m.begin();
+                q != tm.m.end();
+                q++) {
+             unsigned s = MAX(q->first, from);
+             unsigned e = MIN(q->first+q->second, to);
+             if (e > s) {
+               unsigned l = e-s;
+               dout(10) << "   " << q->first << "~" << q->second << " overlap " << s << "~" << l << dendl;
+
+               // add in zeros?
+               if (s > data_end) {
+                 bufferptr bp(s-from);
+                 bp.zero();
+                 data.push_back(bp);
+                 dout(20) << "  adding " << bp.length() << " zeros" << dendl;
+                 rd.length = rd.length + bp.length();
+                 data_end += bp.length();
+               }
+               
+               bufferlist b;
+               b.substr_of(truncated, s-from, l);
+               dout(20) << "  adding " << b.length() << " bytes from " << s << "~" << l << dendl;
+               data.claim_append(b);
+               rd.length = rd.length + l;
+               data_end += l;
+             }
+           } // for
+         } // seq == rd.truncate_eq
+       }
+      }
+      break;
+
     default:
       dout(1) << "unrecognized osd op " << p->op
              << " " << ceph_osd_op_name(p->op)
@@ -891,9 +954,10 @@ void ReplicatedPG::add_interval_usage(interval_set<__u64>& s, pg_stat_t& stats)
 // low level object operations
 int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
                                    pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
-                                   ceph_osd_op& op, bufferlist::iterator& bp,
+                                   vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp,
                                    SnapContext& snapc)
 {
+  ceph_osd_op& op = ops[opn];
   int eop = op.op;
 
   // munge ZERO -> DELETE or TRUNCATE?
@@ -1094,11 +1158,12 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     {
       // just do it inline; this works because we are happy to execute
       // fancy op on replicas as well.
-      ceph_osd_op newop;
+      vector<ceph_osd_op> nops(1);
+      ceph_osd_op& newop = nops[0];
       newop.op = CEPH_OSD_OP_WRITE;
       newop.offset = old_size;
       newop.length = op.length;
-      prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, newop, bp, snapc);
+      prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
     }
     break;
 
@@ -1106,6 +1171,34 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     t.start_sync();
     break;
 
+  case CEPH_OSD_OP_SETTRUNC:
+    if (opn > 0 && ops[opn-1].op == CEPH_OSD_OP_WRITE) {
+      // set truncate seq over preceeding write's range
+      ceph_osd_op& wr = ops[opn-1];
+
+      __u32 seq = 0;
+      interval_set<__u64> tm;
+      bufferlist::iterator p;
+      if (oi.truncate_info.length()) {
+       p = oi.truncate_info.begin();
+       ::decode(seq, p);
+      }
+      if (seq < op.truncate_seq) {
+       seq = op.truncate_seq;
+       tm.insert(wr.offset, wr.length);
+      } else {
+       if (oi.truncate_info.length())
+         ::decode(tm, p);
+       interval_set<__u64> n;
+       n.insert(wr.offset, wr.length);
+       tm.union_of(n);
+      }
+      oi.truncate_info.clear();
+      ::encode(seq, oi.truncate_info);
+      ::encode(tm, oi.truncate_info);
+    }
+    break;
+
   default:
     return -EINVAL;
   }
@@ -1143,7 +1236,7 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
       did_snap = true;
     }
     prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
-                     ops[i], bp, snapc);
+                     ops, i, bp, snapc);
   }
 
   // finish.
index 62f746d695d21845ab89f954574a0905d033db11..d0d22de041b0510ba24ca94b8b105f9fdbd12d70 100644 (file)
@@ -185,7 +185,7 @@ protected:
   void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);  
   int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
                        pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
-                       ceph_osd_op& op, bufferlist::iterator& bp, SnapContext& snapc); 
+                       vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc); 
   void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
                           pobject_t poid, 
                           vector<ceph_osd_op>& ops, bufferlist& bl,
index d98acfde2496af1c1ef065ed48df4de39ca4be8d..c416bd0a7950297e9e9c41af5f576a40d31dd30c 100644 (file)
@@ -728,6 +728,8 @@ struct object_info_t {
   SnapSet snapset;         // [head]
   vector<snapid_t> snaps;  // [clone]
 
+  bufferlist truncate_info;  // bah.. messy layering.
+
   void encode(bufferlist& bl) const {
     ::encode(poid, bl);
     ::encode(version, bl);
@@ -739,6 +741,7 @@ struct object_info_t {
       ::encode(wrlock_by, bl);
     } else
       ::encode(snaps, bl);
+    ::encode(truncate_info, bl);
   }
   void decode(bufferlist::iterator& bl) {
     ::decode(poid, bl);
@@ -751,6 +754,7 @@ struct object_info_t {
       ::decode(wrlock_by, bl);
     } else
       ::decode(snaps, bl);
+    ::decode(truncate_info, bl);
   }
   void decode(bufferlist& bl) {
     bufferlist::iterator p = bl.begin();