dout(10) << "write to " << oid << dendl;
ObjectMutation m;
- m.ops.push_back(ceph_osd_op());
- m.ops[0].op = CEPH_OSD_OP_WRITE;
- m.ops[0].offset = 0;
- m.ops[0].length = osize;
+ OSDOp op;
+ op.op.op = CEPH_OSD_OP_WRITE;
+ op.op.offset = 0;
+ op.op.length = osize;
+ op.data = bl;
+ m.ops.push_back(op);
if (do_sync) {
- m.ops.push_back(ceph_osd_op());
- m.ops[1].op = CEPH_OSD_OP_STARTSYNC;
+ OSDOp op;
+ op.op.op = CEPH_OSD_OP_STARTSYNC;
+ m.ops.push_back(op);
}
- m.data = bl;
client->objecter->mutate(oid, layout, m, snapc, 0,
NULL, new C_Ref(lock, cond, &unack));
/*client->objecter->write(oid, layout, 0, osize, snapc, bl, 0,
__le64 pgls_cookie, count;
};
};
+ __le32 payload_len;
} __attribute__ ((packed));
struct ceph_osd_request_head {
reqhead = req->r_request->front.iov_base;
op = (void *)(reqhead + 1);
op->length = cpu_to_le64(len);
+ op->payload_len = op->length;
req->r_request->hdr.data_len = cpu_to_le32(len);
rc = ceph_osdc_start_request(&client->osdc, req);
if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off);
req->r_request->hdr.data_len = cpu_to_le32(*plen);
+ op->payload_len = cpu_to_le32(*plen);
}
+
/* fill in oid, ticket */
head->object_len = cpu_to_le32(req->r_oid_len);
memcpy(p, req->r_oid, req->r_oid_len);
ceph_osd_request_head head;
public:
object_t oid;
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
bufferlist ticket;
vector<snapid_t> snaps;
osd_peer_stat_t peer_stat;
// ops
void add_simple_op(int o, __u64 off, __u64 len) {
- ceph_osd_op op;
- op.op = o;
- op.offset = off;
- op.length = len;
- ops.push_back(op);
+ OSDOp osd_op;
+ osd_op.op.op = o;
+ osd_op.op.offset = off;
+ osd_op.op.length = len;
+ ops.push_back(osd_op);
}
void write(__u64 off, __u64 len, bufferlist& bl) {
add_simple_op(CEPH_OSD_OP_WRITE, off, len);
head.num_ops = ops.size();
head.ticket_len = ticket.length();
::encode(head, payload);
- ::encode_nohead(ops, payload);
+
+ for (unsigned i = 0; i < head.num_ops; i++) {
+ ops[i].op.payload_len = ops[i].data.length();
+ ::encode(ops[i].op, payload);
+ data.append(ops[i].data);
+ }
+
::encode_nohead(oid.name, payload);
::encode_nohead(ticket, payload);
::encode_nohead(snaps, payload);
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(head, p);
- decode_nohead(head.num_ops, ops, p);
+ ops.resize(head.num_ops);
+ unsigned off = 0;
+ for (unsigned i = 0; i < head.num_ops; i++) {
+ ::decode(ops[i].op, p);
+ ops[i].data.substr_of(data, off, ops[i].op.payload_len);
+ off += ops[i].op.payload_len;
+ }
decode_nohead(head.object_len, oid.name, p);
decode_nohead(head.ticket_len, ticket, p);
decode_nohead(head.num_snaps, snaps, p);
ceph_osd_reply_head head;
public:
object_t oid;
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
long get_tid() { return head.tid; }
object_t get_oid() { return oid; }
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(head, p);
- ::decode_nohead(head.num_ops, ops, p);
+ ops.resize(head.num_ops);
+ unsigned off = 0;
+ for (unsigned i = 0; i < head.num_ops; i++) {
+ ::decode(ops[i].op, p);
+ ops[i].data.substr_of(data, off, ops[i].op.payload_len);
+ off += ops[i].op.payload_len;
+ }
::decode_nohead(head.object_len, oid.name, p);
}
virtual void encode_payload() {
head.num_ops = ops.size();
head.object_len = oid.name.length();
::encode(head, payload);
- ::encode_nohead(ops, payload);
+ for (unsigned i = 0; i < head.num_ops; i++) {
+ ops[i].op.payload_len = ops[i].data.length();
+ ::encode(ops[i].op, payload);
+ data.append(ops[i].data);
+ }
::encode_nohead(oid.name, payload);
}
__u8 acks_wanted;
// op to exec
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
utime_t mtime;
bool noop;
::decode(reqid, p);
::decode(pgid, p);
::decode(poid, p);
- ::decode(ops, p);
+
+ unsigned num_ops;
+ ::decode(num_ops, p);
+ ops.resize(num_ops);
+ unsigned off = 0;
+ for (unsigned i = 0; i < num_ops; i++) {
+ ::decode(ops[i].op, p);
+ ops[i].data.substr_of(data, off, ops[i].op.payload_len);
+ off += ops[i].op.payload_len;
+ }
::decode(mtime, p);
::decode(noop, p);
::decode(acks_wanted, p);
::encode(reqid, payload);
::encode(pgid, payload);
::encode(poid, payload);
- ::encode(ops, payload);
+
+ __u32 num_ops = ops.size();
+ ::encode(num_ops, payload);
+ for (unsigned i = 0; i < ops.size(); i++) {
+ ops[i].op.payload_len = ops[i].data.length();
+ ::encode(ops[i].op, payload);
+ data.append(ops[i].data);
+ }
::encode(mtime, payload);
::encode(noop, payload);
::encode(acks_wanted, payload);
::encode(data_subset, payload);
::encode(clone_subsets, payload);
if (ops.size())
- header.data_off = ops[0].offset;
+ header.data_off = ops[0].op.offset;
else
header.data_off = 0;
}
tid_t rep_tid;
sobject_t poid;
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
// result
__u8 ack_type;
::decode(pgid, p);
::decode(rep_tid, p);
::decode(poid, p);
- ::decode(ops, p);
+
+ unsigned num_ops;
+ ::decode(num_ops, p);
+ ops.resize(num_ops);
+ unsigned off = 0;
+ for (unsigned i = 0; i < num_ops; i++) {
+ ::decode(ops[i].op, p);
+ ops[i].data.substr_of(data, off, ops[i].op.payload_len);
+ off += ops[i].op.payload_len;
+ }
::decode(ack_type, p);
::decode(result, p);
::decode(pg_complete_thru, p);
::encode(pgid, payload);
::encode(rep_tid, payload);
::encode(poid, payload);
- ::encode(ops, payload);
+ __u32 num_ops = ops.size();
+ ::encode(num_ops, payload);
+ for (unsigned i = 0; i < ops.size(); i++) {
+ ops[i].op.payload_len = ops[i].data.length();
+ ::encode(ops[i].op, payload);
+ data.append(ops[i].data);
+ }
::encode(ack_type, payload);
::encode(result, payload);
::encode(pg_complete_thru, payload);
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
bufferlist odata;
bufferlist idata;
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& op = nops[0];
+ vector<OSDOp> nops(1);
+ OSDOp& op = nops[0];
int r;
- op.op = CEPH_OSD_OP_RDCALL;
- op.class_len = strlen(cls);
- op.method_len = strlen(method);
- op.indata_len = datalen;
- idata.append(cls, op.class_len);
- idata.append(method, op.method_len);
- idata.append(indata, datalen);
- bufferlist::iterator iter = idata.begin();
- r = (*pctx)->pg->do_osd_ops(*pctx, nops, iter, odata);
+ op.op.op = CEPH_OSD_OP_RDCALL;
+ op.op.class_len = strlen(cls);
+ op.op.method_len = strlen(method);
+ op.op.indata_len = datalen;
+ op.data.append(cls, op.op.class_len);
+ op.data.append(method, op.op.method_len);
+ op.data.append(indata, datalen);
+ r = (*pctx)->pg->do_osd_ops(*pctx, nops, odata);
*outdata = (char *)malloc(odata.length());
memcpy(*outdata, odata.c_str(), odata.length());
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
bufferlist name_data;
bufferlist odata;
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& op = nops[0];
+ vector<OSDOp> nops(1);
+ OSDOp& op = nops[0];
int r;
- op.op = CEPH_OSD_OP_GETXATTR;
- name_data.append(name);
- op.name_len = strlen(name);
- bufferlist::iterator iter = name_data.begin();
- r = (*pctx)->pg->do_osd_ops(*pctx, nops, iter, odata);
+ op.op.op = CEPH_OSD_OP_GETXATTR;
+ op.data.append(name);
+ op.op.name_len = strlen(name);
+ r = (*pctx)->pg->do_osd_ops(*pctx, nops, odata);
*outdata = (char *)malloc(odata.length());
memcpy(*outdata, odata.c_str(), odata.length());
char **outdata, int *outdatalen)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
- vector<ceph_osd_op> ops(1);
- ops[0].op = CEPH_OSD_OP_READ;
- ops[0].offset = ofs;
- ops[0].length = len;
- bufferlist idata, odata;
- bufferlist::iterator iter = idata.begin();
- int r = (*pctx)->pg->do_osd_ops(*pctx, ops, iter, odata);
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_READ;
+ ops[0].op.offset = ofs;
+ ops[0].op.length = len;
+ bufferlist odata;
+ int r = (*pctx)->pg->do_osd_ops(*pctx, ops, odata);
*outdata = (char *)malloc(odata.length());
memcpy(*outdata, odata.c_str(), odata.length());
int cls_cxx_read(cls_method_context_t hctx, int ofs, int len, bufferlist *outbl)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
- vector<ceph_osd_op> ops(1);
- ops[0].op = CEPH_OSD_OP_READ;
- ops[0].offset = ofs;
- ops[0].length = len;
- bufferlist idata;
- bufferlist::iterator iter = idata.begin();
- return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, *outbl);
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_READ;
+ ops[0].op.offset = ofs;
+ ops[0].op.length = len;
+ return (*pctx)->pg->do_osd_ops(*pctx, ops, *outbl);
}
int cls_cxx_write(cls_method_context_t hctx, int ofs, int len, bufferlist *inbl)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
- vector<ceph_osd_op> ops(1);
- ops[0].op = CEPH_OSD_OP_WRITE;
- ops[0].offset = ofs;
- ops[0].length = len;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_WRITE;
+ ops[0].op.offset = ofs;
+ ops[0].op.length = len;
+ ops[0].data = *inbl;
bufferlist outbl;
- bufferlist::iterator iter = inbl->begin();
- return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, outbl);
+ return (*pctx)->pg->do_osd_ops(*pctx, ops, outbl);
}
int cls_cxx_replace(cls_method_context_t hctx, int ofs, int len, bufferlist *inbl)
{
ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx;
- vector<ceph_osd_op> ops(2);
- ops[0].op = CEPH_OSD_OP_TRUNCATE;
- ops[0].offset = 0;
- ops[0].length = 0;
- ops[1].op = CEPH_OSD_OP_WRITE;
- ops[1].offset = ofs;
- ops[1].length = len;
+ vector<OSDOp> ops(2);
+ ops[0].op.op = CEPH_OSD_OP_TRUNCATE;
+ ops[0].op.offset = 0;
+ ops[0].op.length = 0;
+ ops[1].op.op = CEPH_OSD_OP_WRITE;
+ ops[1].op.offset = ofs;
+ ops[1].op.length = len;
+ ops[1].data = *inbl;
bufferlist outbl;
- bufferlist::iterator iter = inbl->begin();
- return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, outbl);
+ return (*pctx)->pg->do_osd_ops(*pctx, ops, outbl);
}
// request maps from replicas
for (unsigned i=1; i<acting.size(); i++) {
dout(10) << "scrub requesting scrubmap from osd" << acting[i] << dendl;
- vector<ceph_osd_op> scrub(1);
- scrub[0].op = CEPH_OSD_OP_SCRUB;
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB;
sobject_t poid;
eversion_t v;
osd_reqid_t reqid;
bufferlist outdata;
int result = 0;
- for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
- switch (p->op) {
+ for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+ switch (p->op.op) {
case CEPH_OSD_OP_PGLS:
{
dout(10) << " pgls pg=" << op->get_pg() << dendl;
// read into a buffer
PGLSResponse response;
- response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie);
+ response.handle = (collection_list_handle_t)(__u64)(p->op.pgls_cookie);
vector<sobject_t> sentries;
- result = osd->store->collection_list_partial(op->get_pg().to_coll(), op->get_snapid(), sentries, p->length,
+ result = osd->store->collection_list_partial(op->get_pg().to_coll(), op->get_snapid(), sentries, p->op.length,
&response.handle);
if (!result) {
vector<sobject_t>::iterator iter;
osd->logger->inc(l_osd_subop);
if (op->ops.size() >= 1) {
- ceph_osd_op& first = op->ops[0];
- switch (first.op) {
+ OSDOp& first = op->ops[0];
+ switch (first.op.op) {
// rep stuff
case CEPH_OSD_OP_PULL:
sub_op_pull(op);
void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
{
if (r->ops.size() >= 1) {
- ceph_osd_op& first = r->ops[0];
- if (first.op == CEPH_OSD_OP_PUSH) {
+ OSDOp& first = r->ops[0];
+ if (first.op.op == CEPH_OSD_OP_PUSH) {
// continue peer recovery
sub_op_push_reply(r);
return;
}
- if (first.op == CEPH_OSD_OP_SCRUB) {
+ if (first.op.op == CEPH_OSD_OP_SCRUB) {
sub_op_scrub_reply(r);
return;
}
// ========================================================================
// low level osd ops
-int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
- bufferlist::iterator& bp, bufferlist& odata)
+int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
+ bufferlist& odata)
{
int result = 0;
dout(10) << "do_osd_op " << soid << " " << ops << dendl;
- for (vector<ceph_osd_op>::iterator p = ops.begin(); p != ops.end(); p++) {
- ceph_osd_op& op = *p;
+ for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); p++) {
+ OSDOp& osd_op = *p;
+ ceph_osd_op& op = osd_op.op;
bool is_modify;
string cname, mname;
+ bufferlist::iterator bp = osd_op.data.begin();
switch (op.op) {
case CEPH_OSD_OP_RDCALL:
case CEPH_OSD_OP_MASKTRUNC:
if (p != ops.begin()) {
- ceph_osd_op& rd = *(p - 1);
- ceph_osd_op& m = *p;
+ OSDOp& rd = *(p - 1);
+ OSDOp& m = *p;
// are we beyond truncate_size?
- if (rd.offset + rd.length > m.truncate_size) {
+ if (rd.op.offset + rd.op.length > m.op.truncate_size) {
__u32 seq = 0;
interval_set<__u64> tm;
if (oi.truncate_info.length()) {
}
// truncated portion of the read
- unsigned from = MAX(rd.offset, m.truncate_size); // also end of data
- unsigned to = rd.offset + rd.length;
+ unsigned from = MAX(rd.op.offset, m.op.truncate_size); // also end of data
+ unsigned to = rd.op.offset + rd.op.length;
unsigned trim = to-from;
- rd.length = rd.length - trim;
+ rd.op.length = rd.op.length - trim;
dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
truncated.substr_of(odata, odata.length() - trim, trim);
keep.swap(odata);
- if (seq == rd.truncate_seq) {
+ if (seq == rd.op.truncate_seq) {
// keep any valid extents beyond 'from'
unsigned data_end = from;
for (map<__u64,__u64>::iterator q = tm.m.begin();
bp.zero();
odata.push_back(bp);
dout(20) << " adding " << bp.length() << " zeros" << dendl;
- rd.length = rd.length + bp.length();
+ rd.op.length = rd.op.length + bp.length();
data_end += bp.length();
}
b.substr_of(truncated, s-from, l);
dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl;
odata.claim_append(b);
- rd.length = rd.length + l;
+ rd.op.length = rd.op.length + l;
data_end += l;
}
} // for
{
// just do it inline; this works because we are happy to execute
// fancy op on replicas as well.
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_WRITE;
- newop.offset = old_size;
- newop.length = op.length;
- do_osd_ops(ctx, nops, bp, odata);
+ vector<OSDOp> nops(1);
+ OSDOp& newop = nops[0];
+ newop.op.op = CEPH_OSD_OP_WRITE;
+ newop.op.offset = old_size;
+ newop.op.length = op.length;
+ newop.data = osd_op.data;
+ do_osd_ops(ctx, nops, odata);
}
break;
case CEPH_OSD_OP_SETTRUNC:
if (p != ops.begin()) {
// set truncate seq over preceeding write's range
- ceph_osd_op& wr = *(p - 1);
+ OSDOp& wr = *(p - 1);
__u32 seq = 0;
interval_set<__u64> tm;
}
if (seq < op.truncate_seq) {
seq = op.truncate_seq;
- tm.insert(wr.offset, wr.length);
+ tm.insert(wr.op.offset, wr.op.length);
} else {
if (oi.truncate_info.length())
::decode(tm, p);
interval_set<__u64> n;
- n.insert(wr.offset, wr.length);
+ n.insert(wr.op.offset, wr.op.length);
tm.union_of(n);
}
dout(10) << " settrunc seq " << seq << " map " << tm << dendl;
if (op.truncate_seq > old_seq) {
// just truncate/delete.
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_TRUNCATE;
- newop.offset = op.truncate_size;
+ vector<OSDOp> nops(1);
+ OSDOp& newop = nops[0];
+ newop.op.op = CEPH_OSD_OP_TRUNCATE;
+ newop.op.offset = op.truncate_size;
+ newop.data = osd_op.data;
dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq
<< ", truncating with " << newop << dendl;
- do_osd_ops(ctx, nops, bp, odata);
+ do_osd_ops(ctx, nops, odata);
} else {
// do smart truncate
interval_set<__u64> tm;
for (map<__u64,__u64>::iterator p = zero.m.begin();
p != zero.m.end();
p++) {
- vector<ceph_osd_op> nops(1);
- ceph_osd_op& newop = nops[0];
- newop.op = CEPH_OSD_OP_ZERO;
- newop.offset = p->first;
- newop.length = p->second;
- do_osd_ops(ctx, nops, bp, odata);
+ vector<OSDOp> nops(1);
+ OSDOp& newop = nops[0];
+ newop.op.op = CEPH_OSD_OP_ZERO;
+ newop.op.offset = p->first;
+ newop.op.length = p->second;
+ newop.data = osd_op.data;
+ do_osd_ops(ctx, nops, odata);
}
oi.truncate_info.clear();
eversion_t old_version = poi->version;
// prepare the actual mutation
- bufferlist::iterator bp = ctx->indata.begin();
- int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata);
+ int result = do_osd_ops(ctx, ctx->ops, ctx->outdata);
if (result < 0 || ctx->op_t.empty())
return result; // error, or read op.
// any completion stuff to do here?
const sobject_t& soid = repop->ctx->obs->oi.soid;
- ceph_osd_op& first = repop->ctx->ops[0];
+ OSDOp& first = repop->ctx->ops[0];
- switch (first.op) {
+ switch (first.op.op) {
#if 0
case CEPH_OSD_OP_UNBALANCEREADS:
dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl;
if (op->noop)
opname = "no-op";
else
- opname = ceph_osd_op_name(op->ops[0].op);
+ opname = ceph_osd_op_name(op->ops[0].op.op);
dout(10) << "sub_op_modify " << opname
<< " " << soid
tid_t tid = osd->get_tid();
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
osd->osdmap->get_epoch(), tid, v);
- subop->ops = vector<ceph_osd_op>(1);
- subop->ops[0].op = CEPH_OSD_OP_PULL;
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PULL;
subop->data_subset.swap(data_subset);
// do not include clone_subsets in pull request; we will recalculate this
// when the object is pushed back.
osd_reqid_t rid; // useless?
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
- subop->ops = vector<ceph_osd_op>(1);
- subop->ops[0].op = CEPH_OSD_OP_PUSH;
- subop->ops[0].offset = 0;
- subop->ops[0].length = size;
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+ subop->ops[0].op.offset = 0;
+ subop->ops[0].op.length = size;
+ subop->ops[0].data = bl;
subop->data_subset.swap(data_subset);
subop->clone_subsets.swap(clone_subsets);
- subop->set_data(bl); // note: claims bl, set length above here!
subop->attrset.swap(attrset);
osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
{
const sobject_t& soid = op->poid;
eversion_t v = op->version;
- ceph_osd_op& push = op->ops[0];
+ OSDOp& push = op->ops[0];
dout(7) << "op_push "
<< soid
<< " v " << v
- << " len " << push.length
+ << " len " << push.op.length
<< " data_subset " << op->data_subset
<< " clone_subsets " << op->clone_subsets
<< " data len " << op->get_data().length()
// determine data/clone subsets
data_subset = op->data_subset;
- if (data_subset.empty() && push.length && push.length == data.length())
- data_subset.insert(0, push.length);
+ if (data_subset.empty() && push.op.length && push.op.length == data.length())
+ data_subset.insert(0, push.op.length);
clone_subsets = op->clone_subsets;
if (is_primary()) {
struct OpContext {
Message *op;
osd_reqid_t reqid;
- vector<ceph_osd_op>& ops;
+ vector<OSDOp>& ops;
bufferlist& indata;
bufferlist outdata;
ReplicatedPG *pg;
- OpContext(Message *_op, osd_reqid_t _reqid, vector<ceph_osd_op>& _ops, bufferlist& _data,
+ OpContext(Message *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops, bufferlist& _data,
ObjectContext::state_t _mode, ObjectState *_obs, ReplicatedPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), obs(_obs),
clone_obc(0), data_off(0), pg(_pg) {}
void do_sub_op(MOSDSubOp *op);
void do_sub_op_reply(MOSDSubOpReply *op);
bool snap_trimmer();
- int do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
- bufferlist::iterator& bp, bufferlist& odata);
+ int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
+ bufferlist& odata);
bool same_for_read_since(epoch_t e);
bool same_for_modify_since(epoch_t e);
WRITE_CLASS_ENCODER(ScrubMap::object)
WRITE_CLASS_ENCODER(ScrubMap)
+
+struct OSDOp {
+ ceph_osd_op op;
+ bufferlist data;
+
+ OSDOp() {
+ memset(&op, 0, sizeof(ceph_osd_op));
+ }
+};
+
+inline ostream& operator<<(ostream& out, const struct OSDOp& op) {
+ out << op.op << " (data length=" << op.data.length() << ")";
+ return out;
+}
+
+
#endif
vector<ObjectExtent> extents;
file_to_extents(ino, layout, offset, len, extents);
if (extents.size() == 1) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_TRIMTRUNC;
- ops[0].truncate_seq = truncate_seq;
- ops[0].truncate_size = extents[0].offset;
- objecter->modify(extents[0].oid, extents[0].layout, ops, snapc, bl, mtime, flags, onack, oncommit);
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
+ ops[0].op.truncate_seq = truncate_seq;
+ ops[0].op.truncate_size = extents[0].offset;
+ ops[0].data = bl;
+ objecter->modify(extents[0].oid, extents[0].layout, ops, snapc, mtime, flags, onack, oncommit);
} else {
C_Gather *gack = 0, *gcom = 0;
if (onack)
if (oncommit)
gcom = new C_Gather(oncommit);
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
- vector<ceph_osd_op> ops(1);
+ vector<OSDOp> ops(1);
memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_TRIMTRUNC;
- ops[0].truncate_size = p->offset;
- ops[0].truncate_seq = truncate_seq;
- objecter->modify(extents[0].oid, p->layout, ops, snapc, bl, mtime, flags,
+ ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
+ ops[0].op.truncate_size = p->offset;
+ ops[0].op.truncate_seq = truncate_seq;
+ ops[0].data = bl;
+ objecter->modify(extents[0].oid, p->layout, ops, snapc, mtime, flags,
gack ? gack->new_sub():0,
gcom ? gcom->new_sub():0);
}
m->set_snap_seq(0);
m->ops = rd->ops;
- m->set_data(rd->bl);
m->set_retry_attempt(rd->attempts++);
int who = pg.acker();
if (wr->version != eversion_t())
m->set_version(wr->version); // we're replaying this op!
- m->set_data(wr->bl);
-
messenger->send_message(m, osdmap->get_inst(pg.primary()));
} else
maybe_request_map();
// -----------------------------------------
struct ObjectOperation {
- vector<ceph_osd_op> ops;
- bufferlist data;
+ vector<OSDOp> ops;
int flags;
- void add_data(int op, __u64 off, __u64 len) {
+ void add_data(int op, __u64 off, __u64 len, bufferlist& bl) {
int s = ops.size();
ops.resize(s+1);
- memset(&ops[s], 0, sizeof(ops[s]));
- ops[s].op = op;
- ops[s].offset = off;
- ops[s].length = len;
+ ops[s].op.op = op;
+ ops[s].op.offset = off;
+ ops[s].op.length = len;
+ ops[s].data.claim_append(bl);
}
- void add_xattr(int op, int namelen, int valuelen) {
+ void add_xattr(int op, const char *name, const bufferlist& data) {
int s = ops.size();
ops.resize(s+1);
- memset(&ops[s], 0, sizeof(ops[s]));
- ops[s].op = op;
- ops[s].name_len = namelen;
- ops[s].value_len = valuelen;
+ ops[s].op.op = op;
+ ops[s].op.name_len = (name ? strlen(name) : 0);
+ ops[s].op.value_len = data.length();
+ if (name)
+ ops[s].data.append(name);
+ ops[s].data.append(data);
}
void add_call(int op, const char *cname, const char *method, bufferlist &indata) {
int s = ops.size();
ops.resize(s+1);
- memset(&ops[s], 0, sizeof(ops[s]));
- ops[s].op = op;
- ops[s].class_len = strlen(cname);
- ops[s].method_len = strlen(method);
- ops[s].indata_len = indata.length();
- data.append(cname, ops[s].class_len);
- data.append(method, ops[s].method_len);
- data.append(indata);
+ ops[s].op.op = op;
+ ops[s].op.class_len = strlen(cname);
+ ops[s].op.method_len = strlen(method);
+ ops[s].op.indata_len = indata.length();
+ ops[s].data.append(cname, ops[s].op.class_len);
+ ops[s].data.append(method, ops[s].op.method_len);
+ ops[s].data.append(indata);
}
void add_pgls(int op, __u64 count, __u64 cookie) {
int s = ops.size();
ops.resize(s+1);
- memset(&ops[s], 0, sizeof(ops[s]));
- ops[s].op = op;
- ops[s].count = count;
- ops[s].pgls_cookie = cookie;
+ ops[s].op.op = op;
+ ops[s].op.count = count;
+ ops[s].op.pgls_cookie = cookie;
}
ObjectOperation() : flags(0) {}
struct ObjectRead : public ObjectOperation {
void read(__u64 off, __u64 len) {
- add_data(CEPH_OSD_OP_READ, off, len);
+ bufferlist bl;
+ add_data(CEPH_OSD_OP_READ, off, len, bl);
}
void getxattr(const char *name) {
- int l = strlen(name);
- add_xattr(CEPH_OSD_OP_GETXATTR, l, 0);
- data.append(name, l);
+ bufferlist bl;
+ add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
}
void getxattrs() {
- add_xattr(CEPH_OSD_OP_GETXATTRS, 0, 0);
+ bufferlist bl;
+ add_xattr(CEPH_OSD_OP_GETXATTRS, 0, bl);
}
void rdcall(const char *cname, const char *method, bufferlist &indata) {
// object data
void write(__u64 off, __u64 len, bufferlist& bl) {
- add_data(CEPH_OSD_OP_WRITE, off, len);
- data.claim_append(bl);
+ add_data(CEPH_OSD_OP_WRITE, off, len, bl);
}
void write_full(bufferlist& bl) {
- add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
- data.claim_append(bl);
+ add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
}
void zero(__u64 off, __u64 len) {
- add_data(CEPH_OSD_OP_ZERO, off, len);
+ bufferlist bl;
+ add_data(CEPH_OSD_OP_ZERO, off, len, bl);
}
void remove() {
- add_data(CEPH_OSD_OP_DELETE, 0, 0);
+ bufferlist bl;
+ add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
}
// object attrs
void setxattr(const char *name, const bufferlist& bl) {
- int l = strlen(name);
- add_xattr(CEPH_OSD_OP_SETXATTR, l, bl.length());
- data.append(name, l);
- data.append(bl);
+ add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
}
void setxattr(const char *name, const string& s) {
- int l = strlen(name);
- add_xattr(CEPH_OSD_OP_SETXATTR, l, s.length());
- data.append(name, l);
- data.append(s);
+ bufferlist bl;
+ bl.append(s);
+ add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
}
void rmxattr(const char *name) {
- int l = strlen(name);
- add_xattr(CEPH_OSD_OP_RMXATTR, l, 0);
- data.append(name, l);
+ bufferlist bl;
+ add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
}
void setxattrs(map<string, bufferlist>& attrs) {
bufferlist bl;
::encode(attrs, bl);
add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
- data.claim_append(bl);
}
void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
- int l = strlen(prefix);
bufferlist bl;
::encode(attrs, bl);
- add_xattr(CEPH_OSD_OP_RESETXATTRS, l, bl.length());
- data.append(prefix, l);
- data.claim_append(bl);
+ add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
}
};
struct ReadOp {
object_t oid;
ceph_object_layout layout;
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
snapid_t snap;
- bufferlist bl;
bufferlist *pbl;
int flags;
bool paused;
- ReadOp(const object_t& o, ceph_object_layout& ol, vector<ceph_osd_op>& op, snapid_t s, int f, Context *of) :
+ ReadOp(const object_t& o, ceph_object_layout& ol, vector<OSDOp>& op, snapid_t s, int f, Context *of) :
oid(o), layout(ol), snap(s),
pbl(0), flags(f), onfinish(of),
tid(0), attempts(0),
object_t oid;
ceph_object_layout layout;
SnapContext snapc;
- vector<ceph_osd_op> ops;
+ vector<OSDOp> ops;
utime_t mtime;
bufferlist bl;
int flags;
bool paused;
- ModifyOp(const object_t& o, ceph_object_layout& l, vector<ceph_osd_op>& op, utime_t mt,
+ ModifyOp(const object_t& o, ceph_object_layout& l, vector<OSDOp>& op, utime_t mt,
const SnapContext& sc, int f, Context *ac, Context *co) :
oid(o), layout(l), snapc(sc), mtime(mt), flags(f), onack(ac), oncommit(co),
tid(0), attempts(0),
tid_t read_submit(ReadOp *rd);
tid_t modify_submit(ModifyOp *wr);
- tid_t read(const object_t& oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
+ tid_t read(const object_t& oid, ceph_object_layout ol, vector<OSDOp>& ops,
snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish) {
ReadOp *rd = new ReadOp(oid, ol, ops, snap, flags, onfinish);
tid_t read(const object_t& oid, ceph_object_layout ol,
ObjectRead& read, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) {
ReadOp *rd = new ReadOp(oid, ol, read.ops, snap, read.flags | flags, onfinish);
- rd->bl = read.data;
rd->pbl = pbl;
return read_submit(rd);
}
- tid_t modify(const object_t& oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
- const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags,
+ tid_t modify(const object_t& oid, ceph_object_layout ol, vector<OSDOp>& ops,
+ const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit) {
ModifyOp *wr = new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit);
- wr->bl = bl;
return modify_submit(wr);
}
tid_t stat(const object_t& oid, ceph_object_layout ol, snapid_t snap,
__u64 *psize, utime_t *pmtime, int flags,
Context *onfinish) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_STAT;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_STAT;
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
return read(oid, ol, ops, snap, &fin->bl, flags, fin);
}
tid_t read(const object_t& oid, ceph_object_layout ol,
__u64 off, size_t len, snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_READ;
- ops[0].offset = off;
- ops[0].length = len;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_READ;
+ ops[0].op.offset = off;
+ ops[0].op.length = len;
return read(oid, ol, ops, snap, pbl, flags, onfinish);
}
tid_t read_full(const object_t& oid, ceph_object_layout ol,
ObjectMutation& mutation,
const SnapContext& snapc, int flags,
Context *onack, Context *oncommit) {
- return modify(oid, ol, mutation.ops, snapc, mutation.data, mutation.mtime, flags, onack, oncommit);
+ return modify(oid, ol, mutation.ops, snapc, mutation.mtime, flags, onack, oncommit);
}
tid_t write(const object_t& oid, ceph_object_layout ol,
__u64 off, size_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags,
Context *onack, Context *oncommit) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_WRITE;
- ops[0].offset = off;
- ops[0].length = len;
- return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit);
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_WRITE;
+ ops[0].op.offset = off;
+ ops[0].op.length = len;
+ ops[0].data = bl;
+ return modify(oid, ol, ops, snapc, mtime, flags, onack, oncommit);
}
tid_t write_full(const object_t& oid, ceph_object_layout ol,
const SnapContext& snapc, bufferlist &bl, utime_t mtime, int flags,
Context *onack, Context *oncommit) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_WRITEFULL;
- ops[0].offset = 0;
- ops[0].length = bl.length();
- return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit);
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_WRITEFULL;
+ ops[0].op.offset = 0;
+ ops[0].op.length = bl.length();
+ ops[0].data = bl;
+ return modify(oid, ol, ops, snapc, mtime, flags, onack, oncommit);
}
tid_t zero(const object_t& oid, ceph_object_layout ol,
__u64 off, size_t len, const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_ZERO;
- ops[0].offset = off;
- ops[0].length = len;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_ZERO;
+ ops[0].op.offset = off;
+ ops[0].op.length = len;
return modify_submit(new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit));
}
tid_t remove(const object_t& oid, ceph_object_layout ol,
const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_DELETE;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_DELETE;
return modify_submit(new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit));
}
tid_t lock(const object_t& oid, ceph_object_layout ol, int op, int flags, Context *onack, Context *oncommit) {
SnapContext snapc; // no snapc for lock ops
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = op;
+ vector<OSDOp> ops(1);
+ ops[0].op.op = op;
return modify_submit(new ModifyOp(oid, ol, ops, utime_t(), snapc, flags, onack, oncommit));
}