]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: fix truncate ops; add TRIMTRUNC
authorSage Weil <sage@newdream.net>
Thu, 29 Jan 2009 22:47:46 +0000 (14:47 -0800)
committerSage Weil <sage@newdream.net>
Fri, 30 Jan 2009 19:44:33 +0000 (11:44 -0800)
src/osd/ReplicatedPG.cc

index 1c1e73c5ef0fd6746b53f02b92ead24c917f5930..73ef69ec2d7824aba1c1a5f422ba1f483c5f1aad 100644 (file)
@@ -762,11 +762,13 @@ void ReplicatedPG::op_read(MOSDOp *op)
 
        // are we beyond truncate_size?
        if (rd.offset + rd.length > m.truncate_size) {  
-         __u32 seq;
+         __u32 seq = 0;
          interval_set<__u64> tm;
-         bufferlist::iterator p = oi.truncate_info.begin();
-         ::decode(seq, p);
-         ::decode(tm, p);
+         if (oi.truncate_info.length()) {
+           bufferlist::iterator p = oi.truncate_info.begin();
+           ::decode(seq, p);
+           ::decode(tm, p);
+         }
 
          // truncated portion of the read
          unsigned from = MAX(rd.offset, m.truncate_size);  // also end of data
@@ -960,6 +962,8 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
   ceph_osd_op& op = ops[opn];
   int eop = op.op;
 
+  dout(15) << "prepare_simple_op " << reqid << " " << opn << ": " << ops[opn] << " on " << oi << dendl;
+
   // munge ZERO -> DELETE or TRUNCATE?
   if (eop == CEPH_OSD_OP_ZERO &&
       oi.snapset.head_exists &&
@@ -972,7 +976,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       //eop = CEPH_OSD_OP_DELETE;
     } else {
       dout(10) << " munging ZERO " << op.offset << "~" << op.length
-              << " -> TRUNCATE (size is " << old_size << ")" << dendl;
+              << " -> TRUNCATE " << op.offset << " (old size is " << old_size << ")" << dendl;
       eop = CEPH_OSD_OP_TRUNCATE;
       oi.snapset.head_exists = true;
     }
@@ -980,9 +984,9 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
   // munge DELETE -> TRUNCATE?
   if (eop == CEPH_OSD_OP_DELETE &&
       oi.snapset.clones.size()) {
-    dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << oi.snapset.clones << dendl;
+    dout(10) << " munging DELETE -> TRUNCATE 0 bc of clones " << oi.snapset.clones << dendl;
     eop = CEPH_OSD_OP_TRUNCATE;
-    op.length = 0;
+    op.offset = 0;
     oi.snapset.head_exists = false;
   }
 
@@ -1080,26 +1084,26 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     { // truncate
       if (!exists)
        t.touch(info.pgid.to_coll(), poid);
-      t.truncate(info.pgid.to_coll(), poid, op.length);
+      t.truncate(info.pgid.to_coll(), poid, op.offset);
       if (oi.snapset.clones.size()) {
        snapid_t newest = *oi.snapset.clones.rbegin();
        interval_set<__u64> trim;
-       if (old_size > op.length) {
-         trim.insert(op.length, old_size-op.length);
+       if (old_size > op.offset) {
+         trim.insert(op.offset, old_size-op.offset);
          trim.intersection_of(oi.snapset.clone_overlap[newest]);
          add_interval_usage(trim, st);
        }
        interval_set<__u64> keep;
-       if (op.length)
-         keep.insert(0, op.length);
+       if (op.offset)
+         keep.insert(0, op.offset);
        oi.snapset.clone_overlap[newest].intersection_of(keep);
       }
-      if (op.length != old_size) {
+      if (op.offset != old_size) {
        st.num_bytes -= old_size;
        st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
-       st.num_bytes += op.length;
-       st.num_kb += SHIFT_ROUND_UP(op.length, 10);
-       old_size = op.length;
+       st.num_bytes += op.offset;
+       st.num_kb += SHIFT_ROUND_UP(op.offset, 10);
+       old_size = op.offset;
       }
       // do no set head_exists, or we will break above DELETE -> TRUNCATE munging.
     }
@@ -1193,12 +1197,59 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
        n.insert(wr.offset, wr.length);
        tm.union_of(n);
       }
+      dout(10) << " settrunc seq " << seq << " map " << tm << dendl;
       oi.truncate_info.clear();
       ::encode(seq, oi.truncate_info);
       ::encode(tm, oi.truncate_info);
     }
     break;
 
+  case CEPH_OSD_OP_TRIMTRUNC:
+    if (exists) {
+      __u32 old_seq = 0;
+      bufferlist::iterator p;
+      if (oi.truncate_info.length()) {
+       p = oi.truncate_info.begin();
+       ::decode(old_seq, p);
+      }
+      
+      if (op.truncate_seq > old_seq) {
+       // just truncate/delete.
+       vector<ceph_osd_op> nops(1);
+       ceph_osd_op& newop = nops[0];
+       newop.op = CEPH_OSD_OP_TRUNCATE;
+       newop.offset = op.truncate_size;
+       dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
+                << ", truncating with " << newop << dendl;
+       prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+      } else {
+       // do smart truncate
+       interval_set<__u64> tm;
+       ::decode(tm, p);
+
+       interval_set<__u64> zero;
+       zero.insert(0, old_size);
+       tm.intersection_of(zero);
+       zero.subtract(tm);
+       
+       dout(10) << " seq " << op.truncate_seq << " == old_seq " << old_seq
+                << ", tm " << tm << ", zeroing " << zero << dendl;
+       for (map<__u64,__u64>::iterator p = zero.m.begin();
+            p != zero.m.end();
+            p++) {
+         vector<ceph_osd_op> nops(1);
+         ceph_osd_op& newop = nops[0];
+         newop.op = CEPH_OSD_OP_ZERO;
+         newop.offset = p->first;
+         newop.length = p->second;
+         prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+       }
+
+       oi.truncate_info.clear();
+      }
+    }
+    break;
+
   default:
     return -EINVAL;
   }