From: Sage Weil Date: Mon, 10 Nov 2008 18:36:24 +0000 (-0800) Subject: osd: MODIFY is a flag; fix up op_read X-Git-Tag: v0.5~35 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=48688a2f3b5609d7bc71871b393a6f483abdfa3b;p=ceph.git osd: MODIFY is a flag; fix up op_read --- diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 5c8ea57ea28..9954419d869 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -1082,10 +1082,10 @@ enum { CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6, /* fancy read */ - CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1, + CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3, /* fancy write */ - CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, + CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6, }; static inline int ceph_osd_op_type_lock(int op) @@ -1156,10 +1156,11 @@ enum { CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */ CEPH_OSD_OP_RETRY = 4, /* resend attempt */ CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */ - CEPH_OSD_OP_BALANCE_READS = 16, + CEPH_OSD_OP_MODIFY = 16, /* op is/was a mutation */ CEPH_OSD_OP_ACKNVRAM = 32, /* ACK when stable in NVRAM, not RAM */ CEPH_OSD_OP_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */ CEPH_OSD_OP_PEERSTAT = 128, /* msg includes osd_peer_stat */ + CEPH_OSD_OP_BALANCE_READS = 256, }; #define EOLDSNAPC 44 /* ORDERSNAP flag set and writer has old snap context*/ @@ -1195,8 +1196,7 @@ struct ceph_osd_request_head { /* read or mutation */ __le16 num_ops; - __u8 is_modify; - __u8 object_type; + __u16 object_type; struct ceph_osd_op ops[]; /* followed by snaps */ } __attribute__ ((packed)); @@ -1210,8 +1210,7 @@ struct ceph_osd_reply_head { __le32 result; - __le16 num_ops; - __le16 is_modify; + __le32 num_ops; struct ceph_osd_op ops[0]; } __attribute__ ((packed)); diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 06e965fec43..5fe4291f260 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -56,7 +56,7 @@ public: eversion_t get_version() { return head.reassert_version; } - bool is_modify() { return head.is_modify; } + bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; } unsigned get_inc_lock() const { return head.inc_lock; } @@ -74,14 +74,13 @@ public: - MOSDOp(int inc, long tid, bool modify, + MOSDOp(int inc, long tid, object_t oid, ceph_object_layout ol, epoch_t mapepoch, int flags) : Message(CEPH_MSG_OSD_OP) { memset(&head, 0, sizeof(head)); head.tid = tid; head.client_inc = inc; - head.is_modify = modify; head.oid = oid; head.layout = ol; head.osdmap_epoch = mapepoch; diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 3526b23f349..246a71b904e 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -43,7 +43,7 @@ class MOSDOpReply : public Message { __s32 get_result() { return head.result; } eversion_t get_version() { return head.reassert_version; } - bool is_modify() { return head.is_modify; } + bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; } void set_result(int r) { head.result = r; } void set_version(eversion_t v) { head.reassert_version = v; } @@ -57,10 +57,11 @@ public: Message(CEPH_MSG_OSD_OPREPLY) { memset(&head, 0, sizeof(head)); head.tid = req->head.tid; - head.is_modify = req->is_modify(); ops = req->ops; head.result = result; - head.flags = commit ? CEPH_OSD_OP_SAFE:0; + head.flags = + (req->head.flags & ~(CEPH_OSD_OP_SAFE|CEPH_OSD_OP_ACK)) | + (commit ? CEPH_OSD_OP_SAFE:CEPH_OSD_OP_ACK); head.oid = req->head.oid; head.layout = req->head.layout; head.osdmap_epoch = e; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index cec74ec80ae..743fae95964 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -171,11 +171,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now) ceph_object_layout layout; layout.ol_pgid = info.pgid.u.pg64; layout.ol_stripe_unit = 0; - MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true, + MOSDOp *pop = new MOSDOp(0, osd->get_tid(), oid, layout, osd->osdmap->get_epoch(), - 0); + CEPH_OSD_OP_MODIFY); pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0); do_op(pop); } @@ -186,11 +186,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now) ceph_object_layout layout; layout.ol_pgid = info.pgid.u.pg64; layout.ol_stripe_unit = 0; - MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true, + MOSDOp *pop = new MOSDOp(0, osd->get_tid(), oid, layout, osd->osdmap->get_epoch(), - 0); + CEPH_OSD_OP_MODIFY); pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0); do_op(pop); } @@ -626,19 +626,16 @@ void ReplicatedPG::op_read(MOSDOp *op) object_t oid = op->get_oid(); pobject_t poid(info.pgid.pool(), 0, oid); - ceph_osd_op& readop = op->ops[0]; - - dout(10) << "op_read " << ceph_osd_op_name(readop.op) - << " " << oid - << " " << readop.offset << "~" << readop.length - << dendl; + dout(10) << "op_read " << oid << " " << op->ops << dendl; // wrlocked? if (block_if_wrlocked(op)) return; - // taking read shedding out for now, because i don't want to embed - // the osd_peer_stat in MOSDOp + + bufferlist data; + int data_off = 0; + int result = 0; // !primary and unbalanced? // (ignore ops forwarded from the primary) @@ -683,14 +680,10 @@ void ReplicatedPG::op_read(MOSDOp *op) } } - // set up reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); - long r = 0; - // do it. if (poid.oid.snap && !pick_read_snap(poid)) { // we have no revision for this request. - r = -ENOENT; + result = -ENOENT; goto done; } @@ -701,50 +694,68 @@ void ReplicatedPG::op_read(MOSDOp *op) if (cur > op->get_inc_lock()) { dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock() << " on " << poid << dendl; - r = -EINCLOCKED; + result = -EINCLOCKED; goto done; } } - - switch (readop.op) { - case CEPH_OSD_OP_READ: - { - // read into a buffer - bufferlist bl; - r = osd->store->read(info.pgid.to_coll(), poid, - readop.offset, readop.length, - bl); - reply->set_data(bl); - reply->get_header().data_off = readop.offset; - if (r >= 0) - reply->ops[0].length = r; - else - reply->ops[0].length = 0; - dout(10) << " read got " << r << " / " << readop.length << " bytes from obj " << oid << dendl; - } - osd->logger->inc("c_rd"); - osd->logger->inc("c_rdb", reply->ops[0].length); - break; - - case CEPH_OSD_OP_STAT: - { - struct stat st; - memset(&st, sizeof(st), 0); - r = osd->store->stat(info.pgid.to_coll(), poid, &st); - if (r >= 0) - reply->ops[0].length = st.st_size; + + for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { + switch (p->op) { + case CEPH_OSD_OP_READ: + { + // read into a buffer + bufferlist bl; + int r = osd->store->read(info.pgid.to_coll(), poid, p->offset, p->length, bl); + if (data.length() == 0) + data_off = p->offset; + data.claim(bl); + if (r >= 0) + p->length = r; + else { + result = r; + p->length = 0; + } + dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl; + } + osd->logger->inc("c_rd"); + osd->logger->inc("c_rdb", p->length); + break; + + case CEPH_OSD_OP_STAT: + { + struct stat st; + memset(&st, sizeof(st), 0); + int r = osd->store->stat(info.pgid.to_coll(), poid, &st); + if (r >= 0) + p->length = st.st_size; + else + result = r; + } + break; + + case CEPH_OSD_OP_GREP: + { + + } + break; + + default: + dout(1) << "unrecognized osd op " << p->op + << " " << ceph_osd_op_name(p->op) + << dendl; + result = -EOPNOTSUPP; + assert(0); // for now } - break; - - default: - assert(0); } - done: - if (r >= 0) { - reply->set_result(0); + // reply + MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); + reply->set_data(data); + reply->get_header().data_off = data_off; + reply->set_result(result); + if (result >= 0) { utime_t now = g_clock.now(); utime_t diff = now; diff -= op->get_recv_stamp(); @@ -756,12 +767,8 @@ void ReplicatedPG::op_read(MOSDOp *op) if (is_primary() && g_conf.osd_balance_reads) stat_object_temp_rd[oid].hit(now); // hit temp. - - } else { - reply->set_result(r); // error } - - // send it + osd->messenger->send_message(reply, op->get_orig_source_inst()); delete op; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 8dbbd98be68..71a8a597e4f 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -350,7 +350,7 @@ tid_t Objecter::read_submit(ReadOp *rd) int flags = rd->flags; if (rd->onfinish) flags |= CEPH_OSD_OP_ACK; - MOSDOp *m = new MOSDOp(client_inc, last_tid, false, + MOSDOp *m = new MOSDOp(client_inc, last_tid, rd->oid, rd->layout, osdmap->get_epoch(), flags); m->ops = rd->ops; @@ -425,11 +425,11 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) if (rd->pbl) rd->pbl->claim(m->get_data()); - if (rd->psize) { - ceph_osd_op& op = m->ops[0]; - *(rd->psize) = op.length; - } - + if (rd->psize) + for (vector::iterator p = m->ops.begin(); p != m->ops.end(); p++) + if (p->op == CEPH_OSD_OP_STAT) + *(rd->psize) = p->length; + // finish, clean up Context *onfinish = rd->onfinish; dout(7) << " " << bytes_read << " bytes " << dendl; @@ -482,9 +482,9 @@ tid_t Objecter::modify_submit(ModifyOp *wr) << " osd" << pg.primary() << dendl; if (pg.primary() >= 0) { - MOSDOp *m = new MOSDOp(client_inc, wr->tid, true, + MOSDOp *m = new MOSDOp(client_inc, wr->tid, wr->oid, wr->layout, osdmap->get_epoch(), - flags); + flags | CEPH_OSD_OP_MODIFY); m->ops = wr->ops; m->set_snap_seq(wr->snapc.seq); m->get_snaps() = wr->snapc.snaps;