From 7a624a4a8a449e1ccbfb60319599d7c307282018 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 9 Jun 2009 16:33:30 -0700 Subject: [PATCH] osd: hold op data on the specific op --- src/client/SyntheticClient.cc | 16 ++-- src/include/rados.h | 1 + src/kernel/addr.c | 1 + src/kernel/osd_client.c | 2 + src/messages/MOSDOp.h | 28 ++++-- src/messages/MOSDOpReply.h | 16 +++- src/messages/MOSDSubOp.h | 24 ++++- src/messages/MOSDSubOpReply.h | 21 ++++- src/objclass/class_api.cc | 89 +++++++++--------- src/osd/PG.cc | 4 +- src/osd/ReplicatedPG.cc | 120 ++++++++++++------------ src/osd/ReplicatedPG.h | 8 +- src/osd/osd_types.h | 16 ++++ src/osdc/Filer.h | 23 ++--- src/osdc/Objecter.cc | 3 - src/osdc/Objecter.h | 169 +++++++++++++++------------------- 16 files changed, 298 insertions(+), 243 deletions(-) diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 424012077fa55..65e7684de43eb 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -2259,15 +2259,17 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc, dout(10) << "write to " << oid << dendl; ObjectMutation m; - m.ops.push_back(ceph_osd_op()); - m.ops[0].op = CEPH_OSD_OP_WRITE; - m.ops[0].offset = 0; - m.ops[0].length = osize; + OSDOp op; + op.op.op = CEPH_OSD_OP_WRITE; + op.op.offset = 0; + op.op.length = osize; + op.data = bl; + m.ops.push_back(op); if (do_sync) { - m.ops.push_back(ceph_osd_op()); - m.ops[1].op = CEPH_OSD_OP_STARTSYNC; + OSDOp op; + op.op.op = CEPH_OSD_OP_STARTSYNC; + m.ops.push_back(op); } - m.data = bl; client->objecter->mutate(oid, layout, m, snapc, 0, NULL, new C_Ref(lock, cond, &unack)); /*client->objecter->write(oid, layout, 0, osize, snapc, bl, 0, diff --git a/src/include/rados.h b/src/include/rados.h index b01ef8bf06af7..153110c03539e 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -352,6 +352,7 @@ struct ceph_osd_op { __le64 pgls_cookie, count; }; }; + __le32 payload_len; } __attribute__ ((packed)); struct ceph_osd_request_head { diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 44e24d72556ca..df502975eb33c 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -811,6 +811,7 @@ get_more_pages: reqhead = req->r_request->front.iov_base; op = (void *)(reqhead + 1); op->length = cpu_to_le64(len); + op->payload_len = op->length; req->r_request->hdr.data_len = cpu_to_le32(len); rc = ceph_osdc_start_request(&client->osdc, req); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 0d852e7e83e0b..cbf2e99839fae 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -148,8 +148,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); req->r_request->hdr.data_len = cpu_to_le32(*plen); + op->payload_len = cpu_to_le32(*plen); } + /* fill in oid, ticket */ head->object_len = cpu_to_le32(req->r_oid_len); memcpy(p, req->r_oid, req->r_oid_len); diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 91f3d35c5aab5..8d62596ca02fe 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -34,7 +34,7 @@ private: ceph_osd_request_head head; public: object_t oid; - vector ops; + vector ops; bufferlist ticket; vector snaps; osd_peer_stat_t peer_stat; @@ -109,11 +109,11 @@ public: // ops void add_simple_op(int o, __u64 off, __u64 len) { - ceph_osd_op op; - op.op = o; - op.offset = off; - op.length = len; - ops.push_back(op); + OSDOp osd_op; + osd_op.op.op = o; + osd_op.op.offset = off; + osd_op.op.length = len; + ops.push_back(osd_op); } void write(__u64 off, __u64 len, bufferlist& bl) { add_simple_op(CEPH_OSD_OP_WRITE, off, len); @@ -168,7 +168,13 @@ public: head.num_ops = ops.size(); head.ticket_len = ticket.length(); ::encode(head, payload); - ::encode_nohead(ops, payload); + + for (unsigned i = 0; i < head.num_ops; i++) { + ops[i].op.payload_len = ops[i].data.length(); + ::encode(ops[i].op, payload); + data.append(ops[i].data); + } + ::encode_nohead(oid.name, payload); ::encode_nohead(ticket, payload); ::encode_nohead(snaps, payload); @@ -179,7 +185,13 @@ public: virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(head, p); - decode_nohead(head.num_ops, ops, p); + ops.resize(head.num_ops); + unsigned off = 0; + for (unsigned i = 0; i < head.num_ops; i++) { + ::decode(ops[i].op, p); + ops[i].data.substr_of(data, off, ops[i].op.payload_len); + off += ops[i].op.payload_len; + } decode_nohead(head.object_len, oid.name, p); decode_nohead(head.ticket_len, ticket, p); decode_nohead(head.num_snaps, snaps, p); diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 01fe8b92d70a0..7585aa8366b21 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -33,7 +33,7 @@ class MOSDOpReply : public Message { ceph_osd_reply_head head; public: object_t oid; - vector ops; + vector ops; long get_tid() { return head.tid; } object_t get_oid() { return oid; } @@ -81,14 +81,24 @@ public: virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(head, p); - ::decode_nohead(head.num_ops, ops, p); + ops.resize(head.num_ops); + unsigned off = 0; + for (unsigned i = 0; i < head.num_ops; i++) { + ::decode(ops[i].op, p); + ops[i].data.substr_of(data, off, ops[i].op.payload_len); + off += ops[i].op.payload_len; + } ::decode_nohead(head.object_len, oid.name, p); } virtual void encode_payload() { head.num_ops = ops.size(); head.object_len = oid.name.length(); ::encode(head, payload); - ::encode_nohead(ops, payload); + for (unsigned i = 0; i < head.num_ops; i++) { + ops[i].op.payload_len = ops[i].data.length(); + ::encode(ops[i].op, payload); + data.append(ops[i].data); + } ::encode_nohead(oid.name, payload); } diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index 4aec50b8f0d8b..9109ddf674671 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -37,7 +37,7 @@ public: __u8 acks_wanted; // op to exec - vector ops; + vector ops; utime_t mtime; bool noop; @@ -70,7 +70,16 @@ public: ::decode(reqid, p); ::decode(pgid, p); ::decode(poid, p); - ::decode(ops, p); + + unsigned num_ops; + ::decode(num_ops, p); + ops.resize(num_ops); + unsigned off = 0; + for (unsigned i = 0; i < num_ops; i++) { + ::decode(ops[i].op, p); + ops[i].data.substr_of(data, off, ops[i].op.payload_len); + off += ops[i].op.payload_len; + } ::decode(mtime, p); ::decode(noop, p); ::decode(acks_wanted, p); @@ -94,7 +103,14 @@ public: ::encode(reqid, payload); ::encode(pgid, payload); ::encode(poid, payload); - ::encode(ops, payload); + + __u32 num_ops = ops.size(); + ::encode(num_ops, payload); + for (unsigned i = 0; i < ops.size(); i++) { + ops[i].op.payload_len = ops[i].data.length(); + ::encode(ops[i].op, payload); + data.append(ops[i].data); + } ::encode(mtime, payload); ::encode(noop, payload); ::encode(acks_wanted, payload); @@ -112,7 +128,7 @@ public: ::encode(data_subset, payload); ::encode(clone_subsets, payload); if (ops.size()) - header.data_off = ops[0].offset; + header.data_off = ops[0].op.offset; else header.data_off = 0; } diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h index e05d905351539..718087aae08bc 100644 --- a/src/messages/MOSDSubOpReply.h +++ b/src/messages/MOSDSubOpReply.h @@ -39,7 +39,7 @@ public: tid_t rep_tid; sobject_t poid; - vector ops; + vector ops; // result __u8 ack_type; @@ -58,7 +58,16 @@ public: ::decode(pgid, p); ::decode(rep_tid, p); ::decode(poid, p); - ::decode(ops, p); + + unsigned num_ops; + ::decode(num_ops, p); + ops.resize(num_ops); + unsigned off = 0; + for (unsigned i = 0; i < num_ops; i++) { + ::decode(ops[i].op, p); + ops[i].data.substr_of(data, off, ops[i].op.payload_len); + off += ops[i].op.payload_len; + } ::decode(ack_type, p); ::decode(result, p); ::decode(pg_complete_thru, p); @@ -71,7 +80,13 @@ public: ::encode(pgid, payload); ::encode(rep_tid, payload); ::encode(poid, payload); - ::encode(ops, payload); + __u32 num_ops = ops.size(); + ::encode(num_ops, payload); + for (unsigned i = 0; i < ops.size(); i++) { + ops[i].op.payload_len = ops[i].data.length(); + ::encode(ops[i].op, payload); + data.append(ops[i].data); + } ::encode(ack_type, payload); ::encode(result, payload); ::encode(pg_complete_thru, payload); diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc index 6126d578026c1..15c86ea740039 100644 --- a/src/objclass/class_api.cc +++ b/src/objclass/class_api.cc @@ -93,19 +93,18 @@ int cls_rdcall(cls_method_context_t hctx, const char *cls, const char *method, ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; bufferlist odata; bufferlist idata; - vector nops(1); - ceph_osd_op& op = nops[0]; + vector nops(1); + OSDOp& op = nops[0]; int r; - op.op = CEPH_OSD_OP_RDCALL; - op.class_len = strlen(cls); - op.method_len = strlen(method); - op.indata_len = datalen; - idata.append(cls, op.class_len); - idata.append(method, op.method_len); - idata.append(indata, datalen); - bufferlist::iterator iter = idata.begin(); - r = (*pctx)->pg->do_osd_ops(*pctx, nops, iter, odata); + op.op.op = CEPH_OSD_OP_RDCALL; + op.op.class_len = strlen(cls); + op.op.method_len = strlen(method); + op.op.indata_len = datalen; + op.data.append(cls, op.op.class_len); + op.data.append(method, op.op.method_len); + op.data.append(indata, datalen); + r = (*pctx)->pg->do_osd_ops(*pctx, nops, odata); *outdata = (char *)malloc(odata.length()); memcpy(*outdata, odata.c_str(), odata.length()); @@ -120,15 +119,14 @@ int cls_getxattr(cls_method_context_t hctx, const char *name, ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; bufferlist name_data; bufferlist odata; - vector nops(1); - ceph_osd_op& op = nops[0]; + vector nops(1); + OSDOp& op = nops[0]; int r; - op.op = CEPH_OSD_OP_GETXATTR; - name_data.append(name); - op.name_len = strlen(name); - bufferlist::iterator iter = name_data.begin(); - r = (*pctx)->pg->do_osd_ops(*pctx, nops, iter, odata); + op.op.op = CEPH_OSD_OP_GETXATTR; + op.data.append(name); + op.op.name_len = strlen(name); + r = (*pctx)->pg->do_osd_ops(*pctx, nops, odata); *outdata = (char *)malloc(odata.length()); memcpy(*outdata, odata.c_str(), odata.length()); @@ -141,13 +139,12 @@ int cls_read(cls_method_context_t hctx, int ofs, int len, char **outdata, int *outdatalen) { ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; - vector ops(1); - ops[0].op = CEPH_OSD_OP_READ; - ops[0].offset = ofs; - ops[0].length = len; - bufferlist idata, odata; - bufferlist::iterator iter = idata.begin(); - int r = (*pctx)->pg->do_osd_ops(*pctx, ops, iter, odata); + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_READ; + ops[0].op.offset = ofs; + ops[0].op.length = len; + bufferlist odata; + int r = (*pctx)->pg->do_osd_ops(*pctx, ops, odata); *outdata = (char *)malloc(odata.length()); memcpy(*outdata, odata.c_str(), odata.length()); @@ -159,39 +156,37 @@ int cls_read(cls_method_context_t hctx, int ofs, int len, int cls_cxx_read(cls_method_context_t hctx, int ofs, int len, bufferlist *outbl) { ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; - vector ops(1); - ops[0].op = CEPH_OSD_OP_READ; - ops[0].offset = ofs; - ops[0].length = len; - bufferlist idata; - bufferlist::iterator iter = idata.begin(); - return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, *outbl); + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_READ; + ops[0].op.offset = ofs; + ops[0].op.length = len; + return (*pctx)->pg->do_osd_ops(*pctx, ops, *outbl); } int cls_cxx_write(cls_method_context_t hctx, int ofs, int len, bufferlist *inbl) { ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; - vector ops(1); - ops[0].op = CEPH_OSD_OP_WRITE; - ops[0].offset = ofs; - ops[0].length = len; + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_WRITE; + ops[0].op.offset = ofs; + ops[0].op.length = len; + ops[0].data = *inbl; bufferlist outbl; - bufferlist::iterator iter = inbl->begin(); - return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, outbl); + return (*pctx)->pg->do_osd_ops(*pctx, ops, outbl); } int cls_cxx_replace(cls_method_context_t hctx, int ofs, int len, bufferlist *inbl) { ReplicatedPG::OpContext **pctx = (ReplicatedPG::OpContext **)hctx; - vector ops(2); - ops[0].op = CEPH_OSD_OP_TRUNCATE; - ops[0].offset = 0; - ops[0].length = 0; - ops[1].op = CEPH_OSD_OP_WRITE; - ops[1].offset = ofs; - ops[1].length = len; + vector ops(2); + ops[0].op.op = CEPH_OSD_OP_TRUNCATE; + ops[0].op.offset = 0; + ops[0].op.length = 0; + ops[1].op.op = CEPH_OSD_OP_WRITE; + ops[1].op.offset = ofs; + ops[1].op.length = len; + ops[1].data = *inbl; bufferlist outbl; - bufferlist::iterator iter = inbl->begin(); - return (*pctx)->pg->do_osd_ops(*pctx, ops, iter, outbl); + return (*pctx)->pg->do_osd_ops(*pctx, ops, outbl); } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 62bc245cc5480..74f94fc994278 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2119,8 +2119,8 @@ void PG::scrub() // request maps from replicas for (unsigned i=1; i scrub(1); - scrub[0].op = CEPH_OSD_OP_SCRUB; + vector scrub(1); + scrub[0].op.op = CEPH_OSD_OP_SCRUB; sobject_t poid; eversion_t v; osd_reqid_t reqid; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index bfa795c7ce108..ee38bbd44bd66 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -375,16 +375,16 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) bufferlist outdata; int result = 0; - for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { - switch (p->op) { + for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { + switch (p->op.op) { case CEPH_OSD_OP_PGLS: { dout(10) << " pgls pg=" << op->get_pg() << dendl; // read into a buffer PGLSResponse response; - response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie); + response.handle = (collection_list_handle_t)(__u64)(p->op.pgls_cookie); vector sentries; - result = osd->store->collection_list_partial(op->get_pg().to_coll(), op->get_snapid(), sentries, p->length, + result = osd->store->collection_list_partial(op->get_pg().to_coll(), op->get_snapid(), sentries, p->op.length, &response.handle); if (!result) { vector::iterator iter; @@ -633,8 +633,8 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op) osd->logger->inc(l_osd_subop); if (op->ops.size() >= 1) { - ceph_osd_op& first = op->ops[0]; - switch (first.op) { + OSDOp& first = op->ops[0]; + switch (first.op.op) { // rep stuff case CEPH_OSD_OP_PULL: sub_op_pull(op); @@ -654,13 +654,13 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op) void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r) { if (r->ops.size() >= 1) { - ceph_osd_op& first = r->ops[0]; - if (first.op == CEPH_OSD_OP_PUSH) { + OSDOp& first = r->ops[0]; + if (first.op.op == CEPH_OSD_OP_PUSH) { // continue peer recovery sub_op_push_reply(r); return; } - if (first.op == CEPH_OSD_OP_SCRUB) { + if (first.op.op == CEPH_OSD_OP_SCRUB) { sub_op_scrub_reply(r); return; } @@ -813,8 +813,8 @@ bool ReplicatedPG::snap_trimmer() // ======================================================================== // low level osd ops -int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, - bufferlist::iterator& bp, bufferlist& odata) +int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, + bufferlist& odata) { int result = 0; @@ -827,10 +827,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, dout(10) << "do_osd_op " << soid << " " << ops << dendl; - for (vector::iterator p = ops.begin(); p != ops.end(); p++) { - ceph_osd_op& op = *p; + for (vector::iterator p = ops.begin(); p != ops.end(); p++) { + OSDOp& osd_op = *p; + ceph_osd_op& op = osd_op.op; bool is_modify; string cname, mname; + bufferlist::iterator bp = osd_op.data.begin(); switch (op.op) { case CEPH_OSD_OP_RDCALL: @@ -949,11 +951,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_MASKTRUNC: if (p != ops.begin()) { - ceph_osd_op& rd = *(p - 1); - ceph_osd_op& m = *p; + OSDOp& rd = *(p - 1); + OSDOp& m = *p; // are we beyond truncate_size? - if (rd.offset + rd.length > m.truncate_size) { + if (rd.op.offset + rd.op.length > m.op.truncate_size) { __u32 seq = 0; interval_set<__u64> tm; if (oi.truncate_info.length()) { @@ -963,11 +965,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, } // truncated portion of the read - unsigned from = MAX(rd.offset, m.truncate_size); // also end of data - unsigned to = rd.offset + rd.length; + unsigned from = MAX(rd.op.offset, m.op.truncate_size); // also end of data + unsigned to = rd.op.offset + rd.op.length; unsigned trim = to-from; - rd.length = rd.length - trim; + rd.op.length = rd.op.length - trim; dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl; @@ -977,7 +979,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, truncated.substr_of(odata, odata.length() - trim, trim); keep.swap(odata); - if (seq == rd.truncate_seq) { + if (seq == rd.op.truncate_seq) { // keep any valid extents beyond 'from' unsigned data_end = from; for (map<__u64,__u64>::iterator q = tm.m.begin(); @@ -995,7 +997,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, bp.zero(); odata.push_back(bp); dout(20) << " adding " << bp.length() << " zeros" << dendl; - rd.length = rd.length + bp.length(); + rd.op.length = rd.op.length + bp.length(); data_end += bp.length(); } @@ -1003,7 +1005,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, b.substr_of(truncated, s-from, l); dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl; odata.claim_append(b); - rd.length = rd.length + l; + rd.op.length = rd.op.length + l; data_end += l; } } // for @@ -1163,12 +1165,13 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, { // just do it inline; this works because we are happy to execute // fancy op on replicas as well. - vector nops(1); - ceph_osd_op& newop = nops[0]; - newop.op = CEPH_OSD_OP_WRITE; - newop.offset = old_size; - newop.length = op.length; - do_osd_ops(ctx, nops, bp, odata); + vector nops(1); + OSDOp& newop = nops[0]; + newop.op.op = CEPH_OSD_OP_WRITE; + newop.op.offset = old_size; + newop.op.length = op.length; + newop.data = osd_op.data; + do_osd_ops(ctx, nops, odata); } break; @@ -1179,7 +1182,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_SETTRUNC: if (p != ops.begin()) { // set truncate seq over preceeding write's range - ceph_osd_op& wr = *(p - 1); + OSDOp& wr = *(p - 1); __u32 seq = 0; interval_set<__u64> tm; @@ -1190,12 +1193,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, } if (seq < op.truncate_seq) { seq = op.truncate_seq; - tm.insert(wr.offset, wr.length); + tm.insert(wr.op.offset, wr.op.length); } else { if (oi.truncate_info.length()) ::decode(tm, p); interval_set<__u64> n; - n.insert(wr.offset, wr.length); + n.insert(wr.op.offset, wr.op.length); tm.union_of(n); } dout(10) << " settrunc seq " << seq << " map " << tm << dendl; @@ -1216,13 +1219,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, if (op.truncate_seq > old_seq) { // just truncate/delete. - vector nops(1); - ceph_osd_op& newop = nops[0]; - newop.op = CEPH_OSD_OP_TRUNCATE; - newop.offset = op.truncate_size; + vector nops(1); + OSDOp& newop = nops[0]; + newop.op.op = CEPH_OSD_OP_TRUNCATE; + newop.op.offset = op.truncate_size; + newop.data = osd_op.data; dout(10) << " seq " << op.truncate_seq << " > old_seq " << old_seq << ", truncating with " << newop << dendl; - do_osd_ops(ctx, nops, bp, odata); + do_osd_ops(ctx, nops, odata); } else { // do smart truncate interval_set<__u64> tm; @@ -1238,12 +1242,13 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, for (map<__u64,__u64>::iterator p = zero.m.begin(); p != zero.m.end(); p++) { - vector nops(1); - ceph_osd_op& newop = nops[0]; - newop.op = CEPH_OSD_OP_ZERO; - newop.offset = p->first; - newop.length = p->second; - do_osd_ops(ctx, nops, bp, odata); + vector nops(1); + OSDOp& newop = nops[0]; + newop.op.op = CEPH_OSD_OP_ZERO; + newop.op.offset = p->first; + newop.op.length = p->second; + newop.data = osd_op.data; + do_osd_ops(ctx, nops, odata); } oi.truncate_info.clear(); @@ -1386,8 +1391,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) eversion_t old_version = poi->version; // prepare the actual mutation - bufferlist::iterator bp = ctx->indata.begin(); - int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata); + int result = do_osd_ops(ctx, ctx->ops, ctx->outdata); if (result < 0 || ctx->op_t.empty()) return result; // error, or read op. @@ -1522,9 +1526,9 @@ void ReplicatedPG::apply_repop(RepGather *repop) // any completion stuff to do here? const sobject_t& soid = repop->ctx->obs->oi.soid; - ceph_osd_op& first = repop->ctx->ops[0]; + OSDOp& first = repop->ctx->ops[0]; - switch (first.op) { + switch (first.op.op) { #if 0 case CEPH_OSD_OP_UNBALANCEREADS: dout(-10) << "apply_repop completed unbalance-reads on " << oid << dendl; @@ -2039,7 +2043,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) if (op->noop) opname = "no-op"; else - opname = ceph_osd_op_name(op->ops[0].op); + opname = ceph_osd_op_name(op->ops[0].op.op); dout(10) << "sub_op_modify " << opname << " " << soid @@ -2342,8 +2346,8 @@ bool ReplicatedPG::pull(const sobject_t& soid) tid_t tid = osd->get_tid(); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK, osd->osdmap->get_epoch(), tid, v); - subop->ops = vector(1); - subop->ops[0].op = CEPH_OSD_OP_PULL; + subop->ops = vector(1); + subop->ops[0].op.op = CEPH_OSD_OP_PULL; subop->data_subset.swap(data_subset); // do not include clone_subsets in pull request; we will recalculate this // when the object is pushed back. @@ -2480,13 +2484,13 @@ void ReplicatedPG::push(const sobject_t& soid, int peer, osd_reqid_t rid; // useless? MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, osd->osdmap->get_epoch(), osd->get_tid(), oi.version); - subop->ops = vector(1); - subop->ops[0].op = CEPH_OSD_OP_PUSH; - subop->ops[0].offset = 0; - subop->ops[0].length = size; + subop->ops = vector(1); + subop->ops[0].op.op = CEPH_OSD_OP_PUSH; + subop->ops[0].op.offset = 0; + subop->ops[0].op.length = size; + subop->ops[0].data = bl; subop->data_subset.swap(data_subset); subop->clone_subsets.swap(clone_subsets); - subop->set_data(bl); // note: claims bl, set length above here! subop->attrset.swap(attrset); osd->messenger->send_message(subop, osd->osdmap->get_inst(peer)); @@ -2559,12 +2563,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) { const sobject_t& soid = op->poid; eversion_t v = op->version; - ceph_osd_op& push = op->ops[0]; + OSDOp& push = op->ops[0]; dout(7) << "op_push " << soid << " v " << v - << " len " << push.length + << " len " << push.op.length << " data_subset " << op->data_subset << " clone_subsets " << op->clone_subsets << " data len " << op->get_data().length() @@ -2578,8 +2582,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // determine data/clone subsets data_subset = op->data_subset; - if (data_subset.empty() && push.length && push.length == data.length()) - data_subset.insert(0, push.length); + if (data_subset.empty() && push.op.length && push.op.length == data.length()) + data_subset.insert(0, push.op.length); clone_subsets = op->clone_subsets; if (is_primary()) { diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 1e52735b3814d..d2de83ff4f288 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -233,7 +233,7 @@ public: struct OpContext { Message *op; osd_reqid_t reqid; - vector& ops; + vector& ops; bufferlist& indata; bufferlist outdata; @@ -254,7 +254,7 @@ public: ReplicatedPG *pg; - OpContext(Message *_op, osd_reqid_t _reqid, vector& _ops, bufferlist& _data, + OpContext(Message *_op, osd_reqid_t _reqid, vector& _ops, bufferlist& _data, ObjectContext::state_t _mode, ObjectState *_obs, ReplicatedPG *_pg) : op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), obs(_obs), clone_obc(0), data_off(0), pg(_pg) {} @@ -446,8 +446,8 @@ public: void do_sub_op(MOSDSubOp *op); void do_sub_op_reply(MOSDSubOpReply *op); bool snap_trimmer(); - int do_osd_ops(OpContext *ctx, vector& ops, - bufferlist::iterator& bp, bufferlist& odata); + int do_osd_ops(OpContext *ctx, vector& ops, + bufferlist& odata); bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index efc267dce1e81..9a07e9f4a85cf 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -926,4 +926,20 @@ struct ScrubMap { WRITE_CLASS_ENCODER(ScrubMap::object) WRITE_CLASS_ENCODER(ScrubMap) + +struct OSDOp { + ceph_osd_op op; + bufferlist data; + + OSDOp() { + memset(&op, 0, sizeof(ceph_osd_op)); + } +}; + +inline ostream& operator<<(ostream& out, const struct OSDOp& op) { + out << op.op << " (data length=" << op.data.length() << ")"; + return out; +} + + #endif diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index a60b8f2f1ae89..32202833c060e 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -151,12 +151,12 @@ class Filer { vector extents; file_to_extents(ino, layout, offset, len, extents); if (extents.size() == 1) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_TRIMTRUNC; - ops[0].truncate_seq = truncate_seq; - ops[0].truncate_size = extents[0].offset; - objecter->modify(extents[0].oid, extents[0].layout, ops, snapc, bl, mtime, flags, onack, oncommit); + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; + ops[0].op.truncate_seq = truncate_seq; + ops[0].op.truncate_size = extents[0].offset; + ops[0].data = bl; + objecter->modify(extents[0].oid, extents[0].layout, ops, snapc, mtime, flags, onack, oncommit); } else { C_Gather *gack = 0, *gcom = 0; if (onack) @@ -164,12 +164,13 @@ class Filer { if (oncommit) gcom = new C_Gather(oncommit); for (vector::iterator p = extents.begin(); p != extents.end(); p++) { - vector ops(1); + vector ops(1); memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_TRIMTRUNC; - ops[0].truncate_size = p->offset; - ops[0].truncate_seq = truncate_seq; - objecter->modify(extents[0].oid, p->layout, ops, snapc, bl, mtime, flags, + ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC; + ops[0].op.truncate_size = p->offset; + ops[0].op.truncate_seq = truncate_seq; + ops[0].data = bl; + objecter->modify(extents[0].oid, p->layout, ops, snapc, mtime, flags, gack ? gack->new_sub():0, gcom ? gcom->new_sub():0); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index af5df4debeec5..ed39a7cd31a50 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -395,7 +395,6 @@ tid_t Objecter::read_submit(ReadOp *rd) m->set_snap_seq(0); m->ops = rd->ops; - m->set_data(rd->bl); m->set_retry_attempt(rd->attempts++); int who = pg.acker(); @@ -520,8 +519,6 @@ tid_t Objecter::modify_submit(ModifyOp *wr) if (wr->version != eversion_t()) m->set_version(wr->version); // we're replaying this op! - m->set_data(wr->bl); - messenger->send_message(m, osdmap->get_inst(pg.primary())); } else maybe_request_map(); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 0c006bb56967f..f26de1b803f87 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -40,45 +40,44 @@ class Message; // ----------------------------------------- struct ObjectOperation { - vector ops; - bufferlist data; + vector ops; int flags; - void add_data(int op, __u64 off, __u64 len) { + void add_data(int op, __u64 off, __u64 len, bufferlist& bl) { int s = ops.size(); ops.resize(s+1); - memset(&ops[s], 0, sizeof(ops[s])); - ops[s].op = op; - ops[s].offset = off; - ops[s].length = len; + ops[s].op.op = op; + ops[s].op.offset = off; + ops[s].op.length = len; + ops[s].data.claim_append(bl); } - void add_xattr(int op, int namelen, int valuelen) { + void add_xattr(int op, const char *name, const bufferlist& data) { int s = ops.size(); ops.resize(s+1); - memset(&ops[s], 0, sizeof(ops[s])); - ops[s].op = op; - ops[s].name_len = namelen; - ops[s].value_len = valuelen; + ops[s].op.op = op; + ops[s].op.name_len = (name ? strlen(name) : 0); + ops[s].op.value_len = data.length(); + if (name) + ops[s].data.append(name); + ops[s].data.append(data); } void add_call(int op, const char *cname, const char *method, bufferlist &indata) { int s = ops.size(); ops.resize(s+1); - memset(&ops[s], 0, sizeof(ops[s])); - ops[s].op = op; - ops[s].class_len = strlen(cname); - ops[s].method_len = strlen(method); - ops[s].indata_len = indata.length(); - data.append(cname, ops[s].class_len); - data.append(method, ops[s].method_len); - data.append(indata); + ops[s].op.op = op; + ops[s].op.class_len = strlen(cname); + ops[s].op.method_len = strlen(method); + ops[s].op.indata_len = indata.length(); + ops[s].data.append(cname, ops[s].op.class_len); + ops[s].data.append(method, ops[s].op.method_len); + ops[s].data.append(indata); } void add_pgls(int op, __u64 count, __u64 cookie) { int s = ops.size(); ops.resize(s+1); - memset(&ops[s], 0, sizeof(ops[s])); - ops[s].op = op; - ops[s].count = count; - ops[s].pgls_cookie = cookie; + ops[s].op.op = op; + ops[s].op.count = count; + ops[s].op.pgls_cookie = cookie; } ObjectOperation() : flags(0) {} @@ -86,15 +85,16 @@ struct ObjectOperation { struct ObjectRead : public ObjectOperation { void read(__u64 off, __u64 len) { - add_data(CEPH_OSD_OP_READ, off, len); + bufferlist bl; + add_data(CEPH_OSD_OP_READ, off, len, bl); } void getxattr(const char *name) { - int l = strlen(name); - add_xattr(CEPH_OSD_OP_GETXATTR, l, 0); - data.append(name, l); + bufferlist bl; + add_xattr(CEPH_OSD_OP_GETXATTR, name, bl); } void getxattrs() { - add_xattr(CEPH_OSD_OP_GETXATTRS, 0, 0); + bufferlist bl; + add_xattr(CEPH_OSD_OP_GETXATTRS, 0, bl); } void rdcall(const char *cname, const char *method, bufferlist &indata) { @@ -117,51 +117,42 @@ struct ObjectMutation : public ObjectOperation { // object data void write(__u64 off, __u64 len, bufferlist& bl) { - add_data(CEPH_OSD_OP_WRITE, off, len); - data.claim_append(bl); + add_data(CEPH_OSD_OP_WRITE, off, len, bl); } void write_full(bufferlist& bl) { - add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length()); - data.claim_append(bl); + add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); } void zero(__u64 off, __u64 len) { - add_data(CEPH_OSD_OP_ZERO, off, len); + bufferlist bl; + add_data(CEPH_OSD_OP_ZERO, off, len, bl); } void remove() { - add_data(CEPH_OSD_OP_DELETE, 0, 0); + bufferlist bl; + add_data(CEPH_OSD_OP_DELETE, 0, 0, bl); } // object attrs void setxattr(const char *name, const bufferlist& bl) { - int l = strlen(name); - add_xattr(CEPH_OSD_OP_SETXATTR, l, bl.length()); - data.append(name, l); - data.append(bl); + add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } void setxattr(const char *name, const string& s) { - int l = strlen(name); - add_xattr(CEPH_OSD_OP_SETXATTR, l, s.length()); - data.append(name, l); - data.append(s); + bufferlist bl; + bl.append(s); + add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } void rmxattr(const char *name) { - int l = strlen(name); - add_xattr(CEPH_OSD_OP_RMXATTR, l, 0); - data.append(name, l); + bufferlist bl; + add_xattr(CEPH_OSD_OP_RMXATTR, name, bl); } void setxattrs(map& attrs) { bufferlist bl; ::encode(attrs, bl); add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length()); - data.claim_append(bl); } void resetxattrs(const char *prefix, map& attrs) { - int l = strlen(prefix); bufferlist bl; ::encode(attrs, bl); - add_xattr(CEPH_OSD_OP_RESETXATTRS, l, bl.length()); - data.append(prefix, l); - data.claim_append(bl); + add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl); } }; @@ -207,9 +198,8 @@ class Objecter { struct ReadOp { object_t oid; ceph_object_layout layout; - vector ops; + vector ops; snapid_t snap; - bufferlist bl; bufferlist *pbl; int flags; @@ -220,7 +210,7 @@ class Objecter { bool paused; - ReadOp(const object_t& o, ceph_object_layout& ol, vector& op, snapid_t s, int f, Context *of) : + ReadOp(const object_t& o, ceph_object_layout& ol, vector& op, snapid_t s, int f, Context *of) : oid(o), layout(ol), snap(s), pbl(0), flags(f), onfinish(of), tid(0), attempts(0), @@ -234,7 +224,7 @@ class Objecter { object_t oid; ceph_object_layout layout; SnapContext snapc; - vector ops; + vector ops; utime_t mtime; bufferlist bl; int flags; @@ -246,7 +236,7 @@ class Objecter { bool paused; - ModifyOp(const object_t& o, ceph_object_layout& l, vector& op, utime_t mt, + ModifyOp(const object_t& o, ceph_object_layout& l, vector& op, utime_t mt, const SnapContext& sc, int f, Context *ac, Context *co) : oid(o), layout(l), snapc(sc), mtime(mt), flags(f), onack(ac), oncommit(co), tid(0), attempts(0), @@ -360,7 +350,7 @@ class Objecter { tid_t read_submit(ReadOp *rd); tid_t modify_submit(ModifyOp *wr); - tid_t read(const object_t& oid, ceph_object_layout ol, vector& ops, + tid_t read(const object_t& oid, ceph_object_layout ol, vector& ops, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) { ReadOp *rd = new ReadOp(oid, ol, ops, snap, flags, onfinish); @@ -370,16 +360,14 @@ class Objecter { tid_t read(const object_t& oid, ceph_object_layout ol, ObjectRead& read, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) { ReadOp *rd = new ReadOp(oid, ol, read.ops, snap, read.flags | flags, onfinish); - rd->bl = read.data; rd->pbl = pbl; return read_submit(rd); } - tid_t modify(const object_t& oid, ceph_object_layout ol, vector& ops, - const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, + tid_t modify(const object_t& oid, ceph_object_layout ol, vector& ops, + const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit) { ModifyOp *wr = new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit); - wr->bl = bl; return modify_submit(wr); } @@ -387,9 +375,8 @@ class Objecter { tid_t stat(const object_t& oid, ceph_object_layout ol, snapid_t snap, __u64 *psize, utime_t *pmtime, int flags, Context *onfinish) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_STAT; + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); return read(oid, ol, ops, snap, &fin->bl, flags, fin); } @@ -397,11 +384,10 @@ class Objecter { tid_t read(const object_t& oid, ceph_object_layout ol, __u64 off, size_t len, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_READ; - ops[0].offset = off; - ops[0].length = len; + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_READ; + ops[0].op.offset = off; + ops[0].op.length = len; return read(oid, ol, ops, snap, pbl, flags, onfinish); } tid_t read_full(const object_t& oid, ceph_object_layout ol, @@ -414,52 +400,49 @@ class Objecter { ObjectMutation& mutation, const SnapContext& snapc, int flags, Context *onack, Context *oncommit) { - return modify(oid, ol, mutation.ops, snapc, mutation.data, mutation.mtime, flags, onack, oncommit); + return modify(oid, ol, mutation.ops, snapc, mutation.mtime, flags, onack, oncommit); } tid_t write(const object_t& oid, ceph_object_layout ol, __u64 off, size_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_WRITE; - ops[0].offset = off; - ops[0].length = len; - return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit); + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_WRITE; + ops[0].op.offset = off; + ops[0].op.length = len; + ops[0].data = bl; + return modify(oid, ol, ops, snapc, mtime, flags, onack, oncommit); } tid_t write_full(const object_t& oid, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_WRITEFULL; - ops[0].offset = 0; - ops[0].length = bl.length(); - return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit); + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_WRITEFULL; + ops[0].op.offset = 0; + ops[0].op.length = bl.length(); + ops[0].data = bl; + return modify(oid, ol, ops, snapc, mtime, flags, onack, oncommit); } tid_t zero(const object_t& oid, ceph_object_layout ol, __u64 off, size_t len, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_ZERO; - ops[0].offset = off; - ops[0].length = len; + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_ZERO; + ops[0].op.offset = off; + ops[0].op.length = len; return modify_submit(new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit)); } tid_t remove(const object_t& oid, ceph_object_layout ol, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_DELETE; + vector ops(1); + ops[0].op.op = CEPH_OSD_OP_DELETE; return modify_submit(new ModifyOp(oid, ol, ops, mtime, snapc, flags, onack, oncommit)); } tid_t lock(const object_t& oid, ceph_object_layout ol, int op, int flags, Context *onack, Context *oncommit) { SnapContext snapc; // no snapc for lock ops - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = op; + vector ops(1); + ops[0].op.op = op; return modify_submit(new ModifyOp(oid, ol, ops, utime_t(), snapc, flags, onack, oncommit)); } -- 2.39.5