uint64_t size; // on directory, # dentries
uint32_t truncate_seq;
uint64_t truncate_size, truncate_from;
+ uint32_t truncate_pending;
utime_t mtime; // file data modify time.
utime_t atime; // file data access time.
uint32_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes())
mode(0), uid(0), gid(0),
nlink(0), anchored(false),
size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
+ truncate_pending(0),
time_warp_seq(0),
version(0), file_data_version(0), xattr_version(0), last_renamed_version(0) {
memset(&layout, 0, sizeof(layout));
bool is_dir() const { return (mode & S_IFMT) == S_IFDIR; }
bool is_file() const { return (mode & S_IFMT) == S_IFREG; }
- bool is_truncating() const { return truncate_size != -1ull; }
+ bool is_truncating() const { return (truncate_pending > 0); }
int64_t get_layout_size_increment() {
return layout.fl_object_size * layout.fl_stripe_count;
}
info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
info.stats.num_rd++;
- dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
+ dout(0) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
+
+ __u32 seq = oi.truncate_seq;
+ // are we beyond truncate_size?
+ if ( (seq < op.extent.truncate_seq) &&
+ (op.extent.offset + op.extent.length > op.extent.truncate_size) ) {
+
+ // truncated portion of the read
+ unsigned from = MAX(op.extent.offset, op.extent.truncate_size); // also end of data
+ unsigned to = op.extent.offset + op.extent.length;
+ unsigned trim = to-from;
+
+ op.extent.length = op.extent.length - trim;
+
+ bufferlist keep;
+
+ // keep first part of odata; trim at truncation point
+ dout(0) << " obj " << soid << " seq " << seq
+ << ": trimming overlap " << from << "~" << trim << dendl;
+ keep.substr_of(odata, 0, odata.length() - trim);
+ odata.claim(keep);
+ }
}
break;
}
break;
- case CEPH_OSD_OP_MASKTRUNC:
- if (p != ops.begin()) {
- OSDOp& rd = *(p - 1);
- OSDOp& m = *p;
-
- // are we beyond truncate_size?
- if (rd.op.extent.offset + rd.op.extent.length > m.op.trunc.truncate_size) {
- __u32 seq = 0;
- interval_set<__u64> tm;
- if (oi.truncate_info.length()) {
- bufferlist::iterator p = oi.truncate_info.begin();
- ::decode(seq, p);
- ::decode(tm, p);
- }
-
- // truncated portion of the read
- unsigned from = MAX(rd.op.extent.offset, m.op.trunc.truncate_size); // also end of data
- unsigned to = rd.op.extent.offset + rd.op.extent.length;
- unsigned trim = to-from;
-
- rd.op.extent.length = rd.op.extent.length - trim;
-
- dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
-
- bufferlist keep;
- keep.substr_of(odata, 0, odata.length() - trim);
- bufferlist truncated; // everthing after 'from'
- truncated.substr_of(odata, odata.length() - trim, trim);
- keep.swap(odata);
-
- if (seq == m.op.trunc.truncate_seq) {
- // keep any valid extents beyond 'from'
- unsigned data_end = from;
- for (map<__u64,__u64>::iterator q = tm.m.begin();
- q != tm.m.end();
- q++) {
- unsigned s = MAX(q->first, from);
- unsigned e = MIN(q->first+q->second, to);
- if (e > s) {
- unsigned l = e-s;
- dout(10) << " " << q->first << "~" << q->second << " overlap " << s << "~" << l << dendl;
-
- // add in zeros?
- if (s > data_end) {
- bufferptr bp(s-from);
- bp.zero();
- odata.push_back(bp);
- dout(20) << " adding " << bp.length() << " zeros" << dendl;
- rd.op.extent.length = rd.op.extent.length + bp.length();
- data_end += bp.length();
- }
-
- bufferlist b;
- b.substr_of(truncated, s-from, l);
- dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl;
- odata.claim_append(b);
- rd.op.extent.length = rd.op.extent.length + l;
- data_end += l;
- }
- } // for
- } // seq == rd.truncate_eq
- }
- }
- break;
-
// --- WRITES ---
t.start_sync();
break;
- case CEPH_OSD_OP_SETTRUNC:
- if (p != ops.begin()) {
- // set truncate seq over preceeding write's range
- OSDOp& wr = *(p - 1);
-
- __u32 seq = 0;
- interval_set<__u64> tm;
- bufferlist::iterator p;
- if (oi.truncate_info.length()) {
- p = oi.truncate_info.begin();
- ::decode(seq, p);
- }
- if (seq < op.trunc.truncate_seq) {
- seq = op.trunc.truncate_seq;
- tm.insert(wr.op.extent.offset, wr.op.extent.length);
- } else {
- if (oi.truncate_info.length())
- ::decode(tm, p);
- interval_set<__u64> n;
- n.insert(wr.op.extent.offset, wr.op.extent.length);
- tm.union_of(n);
- }
- dout(10) << " settrunc seq " << seq << " map " << tm << dendl;
- oi.truncate_info.clear();
- ::encode(seq, oi.truncate_info);
- ::encode(tm, oi.truncate_info);
- }
- break;
-
case CEPH_OSD_OP_TRIMTRUNC:
if (ctx->obs->exists) {
- __u32 old_seq = 0;
- bufferlist::iterator p;
- if (oi.truncate_info.length()) {
- p = oi.truncate_info.begin();
- ::decode(old_seq, p);
- }
+ __u32 old_seq = oi.truncate_seq;
- if (op.trunc.truncate_seq > old_seq) {
- // just truncate/delete.
+ if (op.extent.truncate_seq > old_seq) {
+ // do the truncate/delete.
vector<OSDOp> nops(1);
OSDOp& newop = nops[0];
newop.op.op = CEPH_OSD_OP_TRUNCATE;
- newop.op.extent.offset = op.trunc.truncate_size;
+ newop.op.extent.offset = op.extent.truncate_size;
newop.data = osd_op.data;
- dout(10) << " seq " << op.trunc.truncate_seq << " > old_seq " << old_seq
+ dout(10) << " seq " << op.extent.truncate_seq << " > old_seq " << old_seq
<< ", truncating with " << newop << dendl;
do_osd_ops(ctx, nops, odata);
+
+ // remember!
+ oi.truncate_seq = op.extent.truncate_seq;
+ oi.truncate_size = op.extent.truncate_size;
} else {
- // do smart truncate
- interval_set<__u64> tm;
- ::decode(tm, p);
-
- interval_set<__u64> zero;
- zero.insert(0, oi.size);
- tm.intersection_of(zero);
- zero.subtract(tm);
-
- dout(10) << " seq " << op.trunc.truncate_seq << " == old_seq " << old_seq
- << ", tm " << tm << ", zeroing " << zero << dendl;
- for (map<__u64,__u64>::iterator p = zero.m.begin();
- p != zero.m.end();
- p++) {
- vector<OSDOp> nops(1);
- OSDOp& newop = nops[0];
- newop.op.op = CEPH_OSD_OP_ZERO;
- newop.op.extent.offset = p->first;
- newop.op.extent.length = p->second;
- newop.data = osd_op.data;
- do_osd_ops(ctx, nops, odata);
- }
-
- oi.truncate_info.clear();
+ // object was already truncated, no-op.
}
}
break;
osd_reqid_t wrlock_by; // [head]
vector<snapid_t> snaps; // [clone]
- bufferlist truncate_info; // bah.. messy layering.
+ __u64 truncate_seq, truncate_size;
void encode(bufferlist& bl) const {
const __u8 v = 1;
::encode(wrlock_by, bl);
else
::encode(snaps, bl);
- ::encode(truncate_info, bl);
+ ::encode(truncate_seq, bl);
+ ::encode(truncate_size, bl);
}
void decode(bufferlist::iterator& bl) {
__u8 v;
::decode(wrlock_by, bl);
else
::decode(snaps, bl);
- ::decode(truncate_info, bl);
+ ::decode(truncate_seq, bl);
+ ::decode(truncate_size, bl);
}
void decode(bufferlist& bl) {
bufferlist::iterator p = bl.begin();
case CEPH_OSD_OP_TRUNCATE:
out << " " << op.op.extent.offset;
break;
- case CEPH_OSD_OP_SETTRUNC:
case CEPH_OSD_OP_MASKTRUNC:
case CEPH_OSD_OP_TRIMTRUNC:
- out << " " << op.op.trunc.truncate_seq << "@" << op.op.trunc.truncate_size;
+ out << " " << op.op.extent.truncate_seq << "@" << op.op.extent.truncate_size;
break;
default:
out << " " << op.op.extent.offset << "~" << op.op.extent.length;
+ out << " [" << op.op.extent.truncate_seq << "@" << op.op.extent.truncate_size << "]";
}
} else if (ceph_osd_op_type_attr(op.op.op)) {
// xattr name
if (extents.size() == 1) {
vector<OSDOp> ops(1);
ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
- ops[0].op.trunc.truncate_seq = truncate_seq;
- ops[0].op.trunc.truncate_size = extents[0].offset;
+ ops[0].op.extent.truncate_seq = truncate_seq;
+ ops[0].op.extent.truncate_size = extents[0].offset;
objecter->_modify(extents[0].oid, extents[0].layout, ops, mtime, snapc, flags, onack, oncommit);
} else {
C_Gather *gack = 0, *gcom = 0;
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
vector<OSDOp> ops(1);
ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
- ops[0].op.trunc.truncate_size = p->offset;
- ops[0].op.trunc.truncate_seq = truncate_seq;
+ ops[0].op.extent.truncate_size = p->offset;
+ ops[0].op.extent.truncate_seq = truncate_seq;
objecter->_modify(extents[0].oid, p->layout, ops, mtime, snapc, flags,
gack ? gack->new_sub():0,
gcom ? gcom->new_sub():0);