#define CEPH_OSD_OP_TYPE_ATTR 0x0300
enum {
+ /** data **/
/* read */
CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
- CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
- CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
-
- /* subop */
- CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1,
- CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
- CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
- CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
- CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
+ /* fancy read */
+ CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
+ CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
- /* object data */
+ /* write */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
CEPH_OSD_OP_TRUNCATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
CEPH_OSD_OP_ZERO = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
CEPH_OSD_OP_DELETE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
- /* object attrs */
+ /* fancy write */
+ CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
+ CEPH_OSD_OP_STARTSYNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
+ CEPH_OSD_OP_SETTRUNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 8,
+ CEPH_OSD_OP_TRIMTRUNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 9,
+
+ /** attrs **/
+ /* read */
+ CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
+ CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
+
+ /* write */
CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
CEPH_OSD_OP_SETXATTRS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
CEPH_OSD_OP_RESETXATTRS= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 3,
CEPH_OSD_OP_RMXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
- /* lock */
+ /** subop **/
+ CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1,
+ CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
+ CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
+ CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+ CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
+
+ /** lock **/
CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
CEPH_OSD_OP_WRUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
CEPH_OSD_OP_RDLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
CEPH_OSD_OP_RDUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
CEPH_OSD_OP_UPLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
-
- /* fancy read */
- CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
-
- /* fancy write */
- CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
- CEPH_OSD_OP_STARTSYNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
};
static inline int ceph_osd_op_type_lock(int op)
case CEPH_OSD_OP_READ: return "read";
case CEPH_OSD_OP_STAT: return "stat";
+ case CEPH_OSD_OP_GREP: return "grep";
+ case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
+
case CEPH_OSD_OP_WRITE: return "write";
case CEPH_OSD_OP_DELETE: return "delete";
case CEPH_OSD_OP_TRUNCATE: return "truncate";
case CEPH_OSD_OP_ZERO: return "zero";
case CEPH_OSD_OP_WRITEFULL: return "writefull";
+ case CEPH_OSD_OP_APPEND: return "append";
+ case CEPH_OSD_OP_STARTSYNC: return "startsync";
+ case CEPH_OSD_OP_SETTRUNC: return "settrunc";
+ case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
+
+ case CEPH_OSD_OP_GETXATTR: return "getxattr";
+ case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
case CEPH_OSD_OP_SETXATTR: return "setxattr";
case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
case CEPH_OSD_OP_RMXATTR: return "rmxattr";
- case CEPH_OSD_OP_WRLOCK: return "wrlock";
- case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
- case CEPH_OSD_OP_RDLOCK: return "rdlock";
- case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
- case CEPH_OSD_OP_UPLOCK: return "uplock";
- case CEPH_OSD_OP_DNLOCK: return "dnlock";
-
case CEPH_OSD_OP_PULL: return "pull";
case CEPH_OSD_OP_PUSH: return "push";
case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
case CEPH_OSD_OP_SCRUB: return "scrub";
- case CEPH_OSD_OP_GREP: return "grep";
- case CEPH_OSD_OP_APPEND: return "append";
- case CEPH_OSD_OP_STARTSYNC: return "startsync";
+ case CEPH_OSD_OP_WRLOCK: return "wrlock";
+ case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
+ case CEPH_OSD_OP_RDLOCK: return "rdlock";
+ case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
+ case CEPH_OSD_OP_UPLOCK: return "uplock";
+ case CEPH_OSD_OP_DNLOCK: return "dnlock";
default: return "???";
}
union {
struct {
__le64 offset, length;
- __le32 seq;
};
struct {
__le32 name_len;
__le32 value_len;
};
+ struct {
+ __le64 truncate_size;
+ __le32 truncate_seq;
+ };
};
} __attribute__ ((packed));
inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
out << ceph_osd_op_name(op.op);
- if (ceph_osd_op_type_data(op.op))
- out << " " << op.offset << "~" << op.length;
- else if (ceph_osd_op_type_attr(op.op))
+ if (ceph_osd_op_type_data(op.op)) {
+ if (op.op == CEPH_OSD_OP_SETTRUNC ||
+ op.op == CEPH_OSD_OP_MASKTRUNC ||
+ op.op == CEPH_OSD_OP_TRIMTRUNC)
+ out << " " << op.truncate_seq << "@" << op.truncate_size;
+ else
+ out << " " << op.offset << "~" << op.length;
+ } else if (ceph_osd_op_type_attr(op.op))
out << " " << op.name_len << "+" << op.value_len;
return out;
}
}
break;
+ case CEPH_OSD_OP_MASKTRUNC:
+ if (p != op->ops.begin()) {
+ ceph_osd_op& rd = *(p - 1);
+ ceph_osd_op& m = *p;
+
+ // are we beyond truncate_size?
+ if (rd.offset + rd.length > m.truncate_size) {
+ __u32 seq;
+ interval_set<__u64> tm;
+ bufferlist::iterator p = oi.truncate_info.begin();
+ ::decode(seq, p);
+ ::decode(tm, p);
+
+ // truncated portion of the read
+ unsigned from = MAX(rd.offset, m.truncate_size); // also end of data
+ unsigned to = rd.offset + rd.length;
+ unsigned trim = to-from;
+
+ rd.length = rd.length - trim;
+
+ dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
+
+ bufferlist keep;
+ keep.substr_of(data, 0, data.length() - trim);
+ bufferlist truncated; // everthing after 'from'
+ truncated.substr_of(data, data.length() - trim, trim);
+ keep.swap(data);
+
+ if (seq == rd.truncate_seq) {
+ // keep any valid extents beyond 'from'
+ unsigned data_end = from;
+ for (map<__u64,__u64>::iterator q = tm.m.begin();
+ q != tm.m.end();
+ q++) {
+ unsigned s = MAX(q->first, from);
+ unsigned e = MIN(q->first+q->second, to);
+ if (e > s) {
+ unsigned l = e-s;
+ dout(10) << " " << q->first << "~" << q->second << " overlap " << s << "~" << l << dendl;
+
+ // add in zeros?
+ if (s > data_end) {
+ bufferptr bp(s-from);
+ bp.zero();
+ data.push_back(bp);
+ dout(20) << " adding " << bp.length() << " zeros" << dendl;
+ rd.length = rd.length + bp.length();
+ data_end += bp.length();
+ }
+
+ bufferlist b;
+ b.substr_of(truncated, s-from, l);
+ dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl;
+ data.claim_append(b);
+ rd.length = rd.length + l;
+ data_end += l;
+ }
+ } // for
+ } // seq == rd.truncate_eq
+ }
+ }
+ break;
+
default:
dout(1) << "unrecognized osd op " << p->op
<< " " << ceph_osd_op_name(p->op)
// low level object operations
int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
- ceph_osd_op& op, bufferlist::iterator& bp,
+ vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp,
SnapContext& snapc)
{
+ ceph_osd_op& op = ops[opn];
int eop = op.op;
// munge ZERO -> DELETE or TRUNCATE?
{
// just do it inline; this works because we are happy to execute
// fancy op on replicas as well.
- ceph_osd_op newop;
+ vector<ceph_osd_op> nops(1);
+ ceph_osd_op& newop = nops[0];
newop.op = CEPH_OSD_OP_WRITE;
newop.offset = old_size;
newop.length = op.length;
- prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, newop, bp, snapc);
+ prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
}
break;
t.start_sync();
break;
+ case CEPH_OSD_OP_SETTRUNC:
+ if (opn > 0 && ops[opn-1].op == CEPH_OSD_OP_WRITE) {
+ // set truncate seq over preceeding write's range
+ ceph_osd_op& wr = ops[opn-1];
+
+ __u32 seq = 0;
+ interval_set<__u64> tm;
+ bufferlist::iterator p;
+ if (oi.truncate_info.length()) {
+ p = oi.truncate_info.begin();
+ ::decode(seq, p);
+ }
+ if (seq < op.truncate_seq) {
+ seq = op.truncate_seq;
+ tm.insert(wr.offset, wr.length);
+ } else {
+ if (oi.truncate_info.length())
+ ::decode(tm, p);
+ interval_set<__u64> n;
+ n.insert(wr.offset, wr.length);
+ tm.union_of(n);
+ }
+ oi.truncate_info.clear();
+ ::encode(seq, oi.truncate_info);
+ ::encode(tm, oi.truncate_info);
+ }
+ break;
+
default:
return -EINVAL;
}
did_snap = true;
}
prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
- ops[i], bp, snapc);
+ ops, i, bp, snapc);
}
// finish.
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
- ceph_osd_op& op, bufferlist::iterator& bp, SnapContext& snapc);
+ vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc);
void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
pobject_t poid,
vector<ceph_osd_op>& ops, bufferlist& bl,
SnapSet snapset; // [head]
vector<snapid_t> snaps; // [clone]
+ bufferlist truncate_info; // bah.. messy layering.
+
void encode(bufferlist& bl) const {
::encode(poid, bl);
::encode(version, bl);
::encode(wrlock_by, bl);
} else
::encode(snaps, bl);
+ ::encode(truncate_info, bl);
}
void decode(bufferlist::iterator& bl) {
::decode(poid, bl);
::decode(wrlock_by, bl);
} else
::decode(snaps, bl);
+ ::decode(truncate_info, bl);
}
void decode(bufferlist& bl) {
bufferlist::iterator p = bl.begin();