// are we beyond truncate_size?
if (rd.offset + rd.length > m.truncate_size) {
- __u32 seq;
+ __u32 seq = 0;
interval_set<__u64> tm;
- bufferlist::iterator p = oi.truncate_info.begin();
- ::decode(seq, p);
- ::decode(tm, p);
+ if (oi.truncate_info.length()) {
+ bufferlist::iterator p = oi.truncate_info.begin();
+ ::decode(seq, p);
+ ::decode(tm, p);
+ }
// truncated portion of the read
unsigned from = MAX(rd.offset, m.truncate_size); // also end of data
ceph_osd_op& op = ops[opn];
int eop = op.op;
+ dout(15) << "prepare_simple_op " << reqid << " " << opn << ": " << ops[opn] << " on " << oi << dendl;
+
// munge ZERO -> DELETE or TRUNCATE?
if (eop == CEPH_OSD_OP_ZERO &&
oi.snapset.head_exists &&
//eop = CEPH_OSD_OP_DELETE;
} else {
dout(10) << " munging ZERO " << op.offset << "~" << op.length
- << " -> TRUNCATE (size is " << old_size << ")" << dendl;
+ << " -> TRUNCATE " << op.offset << " (old size is " << old_size << ")" << dendl;
eop = CEPH_OSD_OP_TRUNCATE;
oi.snapset.head_exists = true;
}
// munge DELETE -> TRUNCATE?
if (eop == CEPH_OSD_OP_DELETE &&
oi.snapset.clones.size()) {
- dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << oi.snapset.clones << dendl;
+ dout(10) << " munging DELETE -> TRUNCATE 0 bc of clones " << oi.snapset.clones << dendl;
eop = CEPH_OSD_OP_TRUNCATE;
- op.length = 0;
+ op.offset = 0;
oi.snapset.head_exists = false;
}
{ // truncate
if (!exists)
t.touch(info.pgid.to_coll(), poid);
- t.truncate(info.pgid.to_coll(), poid, op.length);
+ t.truncate(info.pgid.to_coll(), poid, op.offset);
if (oi.snapset.clones.size()) {
snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> trim;
- if (old_size > op.length) {
- trim.insert(op.length, old_size-op.length);
+ if (old_size > op.offset) {
+ trim.insert(op.offset, old_size-op.offset);
trim.intersection_of(oi.snapset.clone_overlap[newest]);
add_interval_usage(trim, st);
}
interval_set<__u64> keep;
- if (op.length)
- keep.insert(0, op.length);
+ if (op.offset)
+ keep.insert(0, op.offset);
oi.snapset.clone_overlap[newest].intersection_of(keep);
}
- if (op.length != old_size) {
+ if (op.offset != old_size) {
st.num_bytes -= old_size;
st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
- st.num_bytes += op.length;
- st.num_kb += SHIFT_ROUND_UP(op.length, 10);
- old_size = op.length;
+ st.num_bytes += op.offset;
+ st.num_kb += SHIFT_ROUND_UP(op.offset, 10);
+ old_size = op.offset;
}
// do no set head_exists, or we will break above DELETE -> TRUNCATE munging.
}
n.insert(wr.offset, wr.length);
tm.union_of(n);
}
+ dout(10) << " settrunc seq " << seq << " map " << tm << dendl;
oi.truncate_info.clear();
::encode(seq, oi.truncate_info);
::encode(tm, oi.truncate_info);
}
break;
+ case CEPH_OSD_OP_TRIMTRUNC:
+ if (exists) {
+ __u32 old_seq = 0;
+ bufferlist::iterator p;
+ if (oi.truncate_info.length()) {
+ p = oi.truncate_info.begin();
+ ::decode(old_seq, p);
+ }
+
+ if (op.truncate_seq > old_seq) {
+ // just truncate/delete.
+ vector<ceph_osd_op> nops(1);
+ ceph_osd_op& newop = nops[0];
+ newop.op = CEPH_OSD_OP_TRUNCATE;
+ newop.offset = op.truncate_size;
+ dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
+ << ", truncating with " << newop << dendl;
+ prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+ } else {
+ // do smart truncate
+ interval_set<__u64> tm;
+ ::decode(tm, p);
+
+ interval_set<__u64> zero;
+ zero.insert(0, old_size);
+ tm.intersection_of(zero);
+ zero.subtract(tm);
+
+ dout(10) << " seq " << op.truncate_seq << " == old_seq " << old_seq
+ << ", tm " << tm << ", zeroing " << zero << dendl;
+ for (map<__u64,__u64>::iterator p = zero.m.begin();
+ p != zero.m.end();
+ p++) {
+ vector<ceph_osd_op> nops(1);
+ ceph_osd_op& newop = nops[0];
+ newop.op = CEPH_OSD_OP_ZERO;
+ newop.offset = p->first;
+ newop.length = p->second;
+ prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
+ }
+
+ oi.truncate_info.clear();
+ }
+ }
+ break;
+
default:
return -EINVAL;
}