/*
* return false if object doesn't (logically) exist
*/
-int ReplicatedPG::pick_read_snap(pobject_t& poid)
+int ReplicatedPG::pick_read_snap(pobject_t& poid, object_info_t& coi)
{
pobject_t head = poid;
head.oid.snap = CEPH_NOSNAP;
}
// clone
- bl.clear();
- r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bl);
- if (r < 0) {
- dout(20) << "pick_read_snap " << poid << " dne, wtf" << dendl;
- assert(0);
- return -ENOENT;
- }
- object_info_t coi(bl);
-
dout(20) << "pick_read_snap " << poid << " snaps " << coi.snaps << dendl;
snapid_t first = coi.snaps[coi.snaps.size()-1];
snapid_t last = coi.snaps[0];
dout(10) << "op_read " << oid << " " << op->ops << dendl;
+ object_info_t oi(poid);
+
+ bufferlist bv;
+ osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+ if (bv.length())
+ oi.decode(bv);
+
// wrlocked?
- if (block_if_wrlocked(op))
+ if (poid.oid.snap == CEPH_NOSNAP &&
+ block_if_wrlocked(op, oi))
return;
// do it.
if (poid.oid.snap) {
- result = pick_read_snap(poid);
+ result = pick_read_snap(poid, oi);
if (result == -EAGAIN) {
wait_for_missing_object(poid.oid, op);
return;
// low level object operations
int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
- pobject_t poid, __u64& old_size, bool& exists,
+ pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
ceph_osd_op& op, bufferlist::iterator& bp,
- SnapSet& snapset, SnapContext& snapc)
+ SnapContext& snapc)
{
int eop = op.op;
// munge ZERO -> DELETE or TRUNCATE?
if (eop == CEPH_OSD_OP_ZERO &&
- snapset.head_exists &&
+ oi.snapset.head_exists &&
op.offset + op.length >= old_size) {
if (op.offset == 0) {
// FIXME: no, this will zap object attributes... do we really want
dout(10) << " munging ZERO " << op.offset << "~" << op.length
<< " -> TRUNCATE (size is " << old_size << ")" << dendl;
eop = CEPH_OSD_OP_TRUNCATE;
- snapset.head_exists = true;
+ oi.snapset.head_exists = true;
}
}
// munge DELETE -> TRUNCATE?
if (eop == CEPH_OSD_OP_DELETE &&
- snapset.clones.size()) {
- dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
+ oi.snapset.clones.size()) {
+ dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << oi.snapset.clones << dendl;
eop = CEPH_OSD_OP_TRUNCATE;
op.length = 0;
- snapset.head_exists = false;
+ oi.snapset.head_exists = false;
}
switch (eop) {
// -- locking --
case CEPH_OSD_OP_WRLOCK:
- { // lock object
- bufferlist b(sizeof(entity_name_t));
- ::encode(reqid.name, b);
- t.setattr(info.pgid.to_coll(), poid, "wrlock", b);
- }
- break;
+ oi.wrlock_by = reqid;
+ break;
+
case CEPH_OSD_OP_WRUNLOCK:
- { // unlock objects
- t.rmattr(info.pgid.to_coll(), poid, "wrlock");
- }
+ oi.wrlock_by = osd_reqid_t();
break;
case CEPH_OSD_OP_BALANCEREADS:
bufferlist nbl;
bp.copy(op.length, nbl);
t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
- if (snapset.clones.size()) {
- snapid_t newest = *snapset.clones.rbegin();
+ if (oi.snapset.clones.size()) {
+ snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> ch;
ch.insert(op.offset, op.length);
- ch.intersection_of(snapset.clone_overlap[newest]);
- snapset.clone_overlap[newest].subtract(ch);
+ ch.intersection_of(oi.snapset.clone_overlap[newest]);
+ oi.snapset.clone_overlap[newest].subtract(ch);
add_interval_usage(ch, st);
}
if (op.offset + op.length > old_size) {
st.num_kb += SHIFT_ROUND_UP(new_size, 10) - SHIFT_ROUND_UP(old_size, 10);
old_size = new_size;
}
- snapset.head_exists = true;
+ oi.snapset.head_exists = true;
}
break;
bp.copy(op.length, nbl);
t.truncate(info.pgid.to_coll(), poid, 0);
t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
- if (snapset.clones.size()) {
- snapid_t newest = *snapset.clones.rbegin();
- snapset.clone_overlap.erase(newest);
+ if (oi.snapset.clones.size()) {
+ snapid_t newest = *oi.snapset.clones.rbegin();
+ oi.snapset.clone_overlap.erase(newest);
old_size = 0;
}
if (op.length != old_size) {
st.num_kb += SHIFT_ROUND_UP(op.length, 10);
old_size = op.length;
}
- snapset.head_exists = true;
+ oi.snapset.head_exists = true;
}
break;
if (!exists)
t.touch(info.pgid.to_coll(), poid);
t.zero(info.pgid.to_coll(), poid, op.offset, op.length);
- if (snapset.clones.size()) {
- snapid_t newest = *snapset.clones.rbegin();
+ if (oi.snapset.clones.size()) {
+ snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> ch;
ch.insert(op.offset, op.length);
- ch.intersection_of(snapset.clone_overlap[newest]);
- snapset.clone_overlap[newest].subtract(ch);
+ ch.intersection_of(oi.snapset.clone_overlap[newest]);
+ oi.snapset.clone_overlap[newest].subtract(ch);
add_interval_usage(ch, st);
}
- snapset.head_exists = true;
+ oi.snapset.head_exists = true;
}
break;
if (!exists)
t.touch(info.pgid.to_coll(), poid);
t.truncate(info.pgid.to_coll(), poid, op.length);
- if (snapset.clones.size()) {
- snapid_t newest = *snapset.clones.rbegin();
+ if (oi.snapset.clones.size()) {
+ snapid_t newest = *oi.snapset.clones.rbegin();
interval_set<__u64> trim;
if (old_size > op.length) {
trim.insert(op.length, old_size-op.length);
- trim.intersection_of(snapset.clone_overlap[newest]);
+ trim.intersection_of(oi.snapset.clone_overlap[newest]);
add_interval_usage(trim, st);
}
interval_set<__u64> keep;
if (op.length)
keep.insert(0, op.length);
- snapset.clone_overlap[newest].intersection_of(keep);
+ oi.snapset.clone_overlap[newest].intersection_of(keep);
}
if (op.length != old_size) {
st.num_bytes -= old_size;
case CEPH_OSD_OP_DELETE:
{ // delete
t.remove(info.pgid.to_coll(), poid);
- if (snapset.clones.size()) {
- snapid_t newest = *snapset.clones.rbegin();
- add_interval_usage(snapset.clone_overlap[newest], st);
- snapset.clone_overlap.erase(newest); // ok, redundant.
+ if (oi.snapset.clones.size()) {
+ snapid_t newest = *oi.snapset.clones.rbegin();
+ add_interval_usage(oi.snapset.clone_overlap[newest], st);
+ oi.snapset.clone_overlap.erase(newest); // ok, redundant.
}
if (exists) {
st.num_objects--;
st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
old_size = 0;
exists = false;
- snapset.head_exists = false;
+ oi.snapset.head_exists = false;
}
}
break;
bp.copy(op.name_len, name.data()+1);
bufferlist bl;
bp.copy(op.value_len, bl);
- if (!snapset.head_exists) // create object if it doesn't yet exist.
+ if (!oi.snapset.head_exists) // create object if it doesn't yet exist.
t.touch(info.pgid.to_coll(), poid);
t.setattr(info.pgid.to_coll(), poid, name, bl);
- snapset.head_exists = true;
+ oi.snapset.head_exists = true;
}
break;
newop.op = CEPH_OSD_OP_WRITE;
newop.offset = old_size;
newop.length = op.length;
- prepare_simple_op(t, reqid, st, poid, old_size, exists, newop, bp, snapset, snapc);
+ prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, newop, bp, snapc);
}
break;
return -EINVAL;
}
- if (!exists && snapset.head_exists) {
+ if (!exists && oi.snapset.head_exists) {
st.num_objects++;
exists = true;
}
at_version, snapc);
did_snap = true;
}
- prepare_simple_op(t, reqid, info.stats, poid, size, exists,
- ops[i], bp,
- oi.snapset, snapc);
+ prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
+ ops[i], bp, snapc);
}
// finish.
int whoami = osd->get_nodeid();
pobject_t poid(info.pgid.pool(), 0, op->get_oid());
- // --- locking ---
-
- // wrlock?
- if (!op->ops.empty() && // except WRNOOP; we just want to flush
- block_if_wrlocked(op))
- return; // op will be handled later, after the object unlocks
-
// balance-reads set?
#if 0
char v;
}
#endif
+ // get existing object info
+ ProjectedObjectInfo *pinfo = get_projected_object(poid);
+
+ // --- locking ---
+
+ // wrlock?
+ if (!op->ops.empty() && // except noop; we just want to flush
+ block_if_wrlocked(op, pinfo->oi)) {
+ put_projected_object(pinfo);
+ return; // op will be handled later, after the object unlocks
+ }
+
// dup op?
bool noop = false;
const char *opname;
snapc.seq = op->get_snap_seq();
snapc.snaps = op->get_snaps();
- // get existing object info
- ProjectedObjectInfo *pinfo = get_projected_object(poid);
-
// set version in op, for benefit of client and our eventual reply
op->set_version(at_version);