newop.offset = old_size;
newop.length = op.length;
do_osd_ops(ctx, nops, bp, odata, exists, old_size);
- //prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
}
break;
newop.offset = op.truncate_size;
dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
<< ", truncating with " << newop << dendl;
- //prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
do_osd_ops(ctx, nops, bp, odata, exists, old_size);
} else {
// do smart truncate
newop.op = CEPH_OSD_OP_ZERO;
newop.offset = p->first;
newop.length = p->second;
- //prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
do_osd_ops(ctx, nops, bp, odata, exists, old_size);
}
}
}
-// low level object operations
-#if 0
-int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
- sobject_t soid, __u64& old_size, bool& exists, object_info_t& oi,
- vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp,
- SnapContext& snapc)
-{
- ceph_osd_op& op = ops[opn];
- int eop = op.op;
-
- dout(15) << "prepare_simple_op " << reqid << " " << opn << ": " << ops[opn] << " on " << oi << dendl;
-
- // munge ZERO -> TRUNCATE? (don't munge to DELETE or we risk hosing attributes)
- if (eop == CEPH_OSD_OP_ZERO &&
- oi.snapset.head_exists &&
- op.offset + op.length >= old_size) {
- dout(10) << " munging ZERO " << op.offset << "~" << op.length
- << " -> TRUNCATE " << op.offset << " (old size is " << old_size << ")" << dendl;
- eop = CEPH_OSD_OP_TRUNCATE;
- oi.snapset.head_exists = true;
- }
- // munge DELETE -> TRUNCATE?
- if (eop == CEPH_OSD_OP_DELETE &&
- oi.snapset.clones.size()) {
- dout(10) << " munging DELETE -> TRUNCATE 0 bc of clones " << oi.snapset.clones << dendl;
- eop = CEPH_OSD_OP_TRUNCATE;
- op.offset = 0;
- oi.snapset.head_exists = false;
- }
-
- switch (eop) {
-
- // -- locking --
- case CEPH_OSD_OP_WRLOCK:
- oi.wrlock_by = reqid;
- break;
-
- case CEPH_OSD_OP_WRUNLOCK:
- oi.wrlock_by = osd_reqid_t();
- break;
-
- case CEPH_OSD_OP_BALANCEREADS:
- {
- bool bal = true;
- t.setattr(info.pgid.to_coll(), soid, "balance-reads", &bal, sizeof(bal));
- }
- break;
- case CEPH_OSD_OP_UNBALANCEREADS:
- {
- t.rmattr(info.pgid.to_coll(), soid, "balance-reads");
- }
- break;
-
-
- // -- object data --
-
- case CEPH_OSD_OP_WRITE:
- { // write
- assert(op.length);
- bufferlist nbl;
- bp.copy(op.length, nbl);
- t.write(info.pgid.to_coll(), soid, op.offset, op.length, nbl);
- if (oi.snapset.clones.size()) {
- snapid_t newest = *oi.snapset.clones.rbegin();
- interval_set<__u64> ch;
- ch.insert(op.offset, op.length);
- ch.intersection_of(oi.snapset.clone_overlap[newest]);
- oi.snapset.clone_overlap[newest].subtract(ch);
- add_interval_usage(ch, st);
- }
- if (op.offset + op.length > old_size) {
- __u64 new_size = op.offset + op.length;
- st.num_bytes += new_size - old_size;
- st.num_kb += SHIFT_ROUND_UP(new_size, 10) - SHIFT_ROUND_UP(old_size, 10);
- old_size = new_size;
- }
- oi.snapset.head_exists = true;
- }
- break;
-
- case CEPH_OSD_OP_WRITEFULL:
- { // write full object
- bufferlist nbl;
- bp.copy(op.length, nbl);
- t.truncate(info.pgid.to_coll(), soid, 0);
- t.write(info.pgid.to_coll(), soid, op.offset, op.length, nbl);
- if (oi.snapset.clones.size()) {
- snapid_t newest = *oi.snapset.clones.rbegin();
- oi.snapset.clone_overlap.erase(newest);
- old_size = 0;
- }
- if (op.length != old_size) {
- st.num_bytes -= old_size;
- st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
- st.num_bytes += op.length;
- st.num_kb += SHIFT_ROUND_UP(op.length, 10);
- old_size = op.length;
- }
- oi.snapset.head_exists = true;
- }
- break;
-
- case CEPH_OSD_OP_ZERO:
- { // zero
- assert(op.length);
- if (!exists)
- t.touch(info.pgid.to_coll(), soid);
- t.zero(info.pgid.to_coll(), soid, op.offset, op.length);
- if (oi.snapset.clones.size()) {
- snapid_t newest = *oi.snapset.clones.rbegin();
- interval_set<__u64> ch;
- ch.insert(op.offset, op.length);
- ch.intersection_of(oi.snapset.clone_overlap[newest]);
- oi.snapset.clone_overlap[newest].subtract(ch);
- add_interval_usage(ch, st);
- }
- oi.snapset.head_exists = true;
- }
- break;
-
- case CEPH_OSD_OP_TRUNCATE:
- { // truncate
- if (!exists)
- t.touch(info.pgid.to_coll(), soid);
- t.truncate(info.pgid.to_coll(), soid, op.offset);
- if (oi.snapset.clones.sistze()) {
- snapid_t newest = *oi.snapset.clones.rbegin();
- interval_set<__u64> trim;
- if (old_size > op.offset) {
- trim.insert(op.offset, old_size-op.offset);
- trim.intersection_of(oi.snapset.clone_overlap[newest]);
- add_interval_usage(trim, st);
- }
- interval_set<__u64> keep;
- if (op.offset)
- keep.insert(0, op.offset);
- oi.snapset.clone_overlap[newest].intersection_of(keep);
- }
- if (op.offset != old_size) {
- st.num_bytes -= old_size;
- st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
- st.num_bytes += op.offset;
- st.num_kb += SHIFT_ROUND_UP(op.offset, 10);
- old_size = op.offset;
- }
- // do no set head_exists, or we will break above DELETE -> TRUNCATE munging.
- }
- break;
-
- case CEPH_OSD_OP_DELETE:
- { // delete
- t.remove(info.pgid.to_coll(), soid);
- if (oi.snapset.clones.size()) {
- snapid_t newest = *oi.snapset.clones.rbegin();
- add_interval_usage(oi.snapset.clone_overlap[newest], st);
- oi.snapset.clone_overlap.erase(newest); // ok, redundant.
- }
- if (exists) {
- st.num_objects--;
- st.num_bytes -= old_size;
- st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
- old_size = 0;
- exists = false;
- oi.snapset.head_exists = false;
- }
- }
- break;
-
-
- // -- object attrs --
-
- case CEPH_OSD_OP_SETXATTR:
- {
- if (!exists)
- t.touch(info.pgid.to_coll(), soid);
- nstring name(op.name_len + 1);
- name[0] = '_';
- bp.copy(op.name_len, name.data()+1);
- bufferlist bl;
- bp.copy(op.value_len, bl);
- if (!oi.snapset.head_exists) // create object if it doesn't yet exist.
- t.touch(info.pgid.to_coll(), soid);
- t.setattr(info.pgid.to_coll(), soid, name, bl);
- oi.snapset.head_exists = true;
- }
- break;
-
- case CEPH_OSD_OP_RMXATTR:
- {
- nstring name(op.name_len + 1);
- name[0] = '_';
- bp.copy(op.name_len, name.data()+1);
- t.rmattr(info.pgid.to_coll(), soid, name);
- }
- break;
-
-
- // -- fancy writers --
- case CEPH_OSD_OP_APPEND:
- {
- // just do it inline; this works because we are happy to execute
- // fancy op on replicas as well.
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_WRITE;
- newop.offset = old_size;
- newop.length = op.length;
- prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
- }
- break;
-
- case CEPH_OSD_OP_STARTSYNC:
- t.start_sync();
- break;
-
- case CEPH_OSD_OP_SETTRUNC:
- if (opn > 0 && ops[opn-1].op == CEPH_OSD_OP_WRITE) {
- // set truncate seq over preceeding write's range
- ceph_osd_op& wr = ops[opn-1];
-
- __u32 seq = 0;
- interval_set<__u64> tm;
- bufferlist::iterator p;
- if (oi.truncate_info.length()) {
- p = oi.truncate_info.begin();
- ::decode(seq, p);
- }
- if (seq < op.truncate_seq) {
- seq = op.truncate_seq;
- tm.insert(wr.offset, wr.length);
- } else {
- if (oi.truncate_info.length())
- ::decode(tm, p);
- interval_set<__u64> n;
- n.insert(wr.offset, wr.length);
- tm.union_of(n);
- }
- dout(10) << " settrunc seq " << seq << " map " << tm << dendl;
- oi.truncate_info.clear();
- ::encode(seq, oi.truncate_info);
- ::encode(tm, oi.truncate_info);
- }
- break;
-
- case CEPH_OSD_OP_TRIMTRUNC:
- if (exists) {
- __u32 old_seq = 0;
- bufferlist::iterator p;
- if (oi.truncate_info.length()) {
- p = oi.truncate_info.begin();
- ::decode(old_seq, p);
- }
-
- if (op.truncate_seq > old_seq) {
- // just truncate/delete.
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_TRUNCATE;
- newop.offset = op.truncate_size;
- dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
- << ", truncating with " << newop << dendl;
- prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
- } else {
- // do smart truncate
- interval_set<__u64> tm;
- ::decode(tm, p);
-
- interval_set<__u64> zero;
- zero.insert(0, old_size);
- tm.intersection_of(zero);
- zero.subtract(tm);
-
- dout(10) << " seq " << op.truncate_seq << " == old_seq " << old_seq
- << ", tm " << tm << ", zeroing " << zero << dendl;
- for (map<__u64,__u64>::iterator p = zero.m.begin();
- p != zero.m.end();
- p++) {
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_ZERO;
- newop.offset = p->first;
- newop.length = p->second;
- prepare_simple_op(t, reqid, st, soid, old_size, exists, oi, nops, 0, bp, snapc);
- }
-
- oi.truncate_info.clear();
- }
- }
- break;
-
- default:
- return -EINVAL;
- }
-
- if (!exists && oi.snapset.head_exists) {
- st.num_objects++;
- exists = true;
- }
-
- return 0;
-}
-#endif
int ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size)
{