struct {
__le64 offset, length;
__le64 src_offset;
- __u8 src_oid_idx;
} __attribute__ ((packed)) clonerange;
};
__le32 payload_len;
for (unsigned i = 0; i < ops.size(); i++) {
if (ceph_osd_op_type_multi(ops[i].op.op)) {
- ops[i].op.payload_len = ops[i].oid.name.length();
- data.append(ops[i].oid.name);
+ bufferlist bl;
+ ::encode(ops[i].soid, bl);
+ ops[i].op.payload_len = bl.length();
+ data.append(bl);
} else {
ops[i].op.payload_len = ops[i].data.length();
data.append(ops[i].data);
bufferlist t;
ops[i].data.substr_of(t, off, ops[i].op.payload_len);
off += ops[i].op.payload_len;
- if (t.length())
- oid.name = t.c_str();
+ if (t.length()) {
+ bufferlist::iterator p = t.begin();
+ ::decode(ops[i].soid, p);
+ }
} else {
ops[i].data.substr_of(data, off, ops[i].op.payload_len);
off += ops[i].op.payload_len;
for (vector<OSDOp>::const_iterator p = op->ops.begin();
p != op->ops.end();
++p) {
- if (p->oid.name.length() == 0)
+ if (p->soid.oid.name.length() == 0)
continue;
- sobject_t soid(p->oid, CEPH_NOSNAP);
- if (pg->is_missing_object(soid)) {
- pg->wait_for_missing_object(soid, op);
+ sobject_t head(p->soid.oid, CEPH_NOSNAP);
+ if (pg->is_missing_object(head)) {
+ pg->wait_for_missing_object(head, op);
pg->unlock();
return;
}
op->rmw_flags |= CEPH_OSD_FLAG_READ;
// set READ flag if there are src_oids
- if (iter->oid.name.length())
+ if (iter->soid.oid.name.length())
op->rmw_flags |= CEPH_OSD_FLAG_READ;
// set PGOP flag if there are PG ops
dout(10) << "do_op mode now " << mode << dendl;
// src_oids
- map<object_t,ObjectContext*> src_obc;
+ map<sobject_t,ObjectContext*> src_obc;
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
OSDOp& osd_op = *p;
- if (osd_op.oid.name.length()) {
- if (!src_obc.count(osd_op.oid)) {
+ if (osd_op.soid.oid.name.length()) {
+ if (!src_obc.count(osd_op.soid)) {
ObjectContext *sobc;
snapid_t ssnapid;
- int r = find_object_context(osd_op.oid, op->get_object_locator(), CEPH_NOSNAP, &sobc, false, &ssnapid);
+ int r = find_object_context(osd_op.soid.oid, op->get_object_locator(), osd_op.soid.snap,
+ &sobc, false, &ssnapid);
+ if (r == -EAGAIN) {
+ // If we're not the primary of this OSD, and we have
+ // CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
+ // we have to wait for the object.
+ if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
+ // missing the specific snap we need; requeue and wait.
+ sobject_t soid(osd_op.soid.oid, ssnapid);
+ wait_for_missing_object(soid, op);
+ return;
+ }
+ }
if (r) {
osd->reply_op_error(op, r);
return;
if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
- dout(1) << " src_oid " << osd_op.oid << " oloc " << sobc->obs.oi.oloc << " != "
+ dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
<< op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
osd->reply_op_error(op, -EINVAL);
return;
}
- src_obc[osd_op.oid] = sobc;
+ src_obc[osd_op.soid] = sobc;
}
}
}
case CEPH_OSD_OP_CLONERANGE:
{
bufferlist::iterator p = osd_op.data.begin();
- object_t& src_oid = osd_op.oid;
- ObjectContext *sobc = ctx->src_obc[src_oid];
+ ObjectContext *sobc = ctx->src_obc[osd_op.soid];
if (!obs.exists)
t.touch(coll, obs.oi.soid);
}
}
-void ReplicatedPG::put_object_contexts(map<object_t,ObjectContext*>& obcv)
+void ReplicatedPG::put_object_contexts(map<sobject_t,ObjectContext*>& obcv)
{
- for (map<object_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p)
+ for (map<sobject_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p)
put_object_context(p->second);
obcv.clear();
}
vector<PG::Log::Entry> log;
ObjectContext *obc;
- map<object_t,ObjectContext*> src_obc;
+ map<sobject_t,ObjectContext*> src_obc;
ObjectContext *clone_obc; // if we created a clone
ObjectContext *snapset_obc; // if we created/deleted a snapdir
OpContext *ctx;
ObjectContext *obc;
- map<object_t,ObjectContext*> src_obc;
+ map<sobject_t,ObjectContext*> src_obc;
tid_t rep_tid;
register_snapset_context(obc->ssc);
}
void put_object_context(ObjectContext *obc);
- void put_object_contexts(map<object_t,ObjectContext*>& obcv);
+ void put_object_contexts(map<sobject_t,ObjectContext*>& obcv);
int find_object_context(const object_t& oid, const object_locator_t& oloc,
snapid_t snapid, ObjectContext **pobc,
bool can_create, snapid_t *psnapid=NULL);
struct OSDOp {
ceph_osd_op op;
bufferlist data;
- object_t oid;
+ sobject_t soid;
OSDOp() {
memset(&op, 0, sizeof(ceph_osd_op));
break;
case CEPH_OSD_OP_CLONERANGE:
out << " " << op.op.clonerange.offset << "~" << op.op.clonerange.length
- << " from " << op.oid
+ << " from " << op.soid
<< " offset " << op.op.clonerange.src_offset;
break;
default:
ops[s].op.extent.length = len;
ops[s].data.claim_append(bl);
}
- void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff) {
+ void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff, snapid_t srcsnapid) {
int s = ops.size();
ops.resize(s+1);
ops[s].op.op = op;
ops[s].op.clonerange.offset = off;
ops[s].op.clonerange.length = len;
ops[s].op.clonerange.src_offset = srcoff;
- ops[s].oid = srcoid;
+ ops[s].soid = sobject_t(srcoid, srcsnapid);
}
void add_xattr(int op, const char *name, const bufferlist& data) {
int s = ops.size();
}
void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, uint64_t dst_offset) {
- add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, src_offset);
+ add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, src_offset, CEPH_NOSNAP);
}
// object attrs