CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
/* fancy read */
- CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
+ CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
/* fancy write */
- CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
+ CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
};
static inline int ceph_osd_op_type_lock(int op)
CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */
CEPH_OSD_OP_RETRY = 4, /* resend attempt */
CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */
- CEPH_OSD_OP_BALANCE_READS = 16,
+ CEPH_OSD_OP_MODIFY = 16, /* op is/was a mutation */
CEPH_OSD_OP_ACKNVRAM = 32, /* ACK when stable in NVRAM, not RAM */
CEPH_OSD_OP_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */
CEPH_OSD_OP_PEERSTAT = 128, /* msg includes osd_peer_stat */
+ CEPH_OSD_OP_BALANCE_READS = 256,
};
#define EOLDSNAPC 44 /* ORDERSNAP flag set and writer has old snap context*/
/* read or mutation */
__le16 num_ops;
- __u8 is_modify;
- __u8 object_type;
+ __u16 object_type;
struct ceph_osd_op ops[]; /* followed by snaps */
} __attribute__ ((packed));
__le32 result;
- __le16 num_ops;
- __le16 is_modify;
+ __le32 num_ops;
struct ceph_osd_op ops[0];
} __attribute__ ((packed));
eversion_t get_version() { return head.reassert_version; }
- bool is_modify() { return head.is_modify; }
+ bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; }
unsigned get_inc_lock() const { return head.inc_lock; }
- MOSDOp(int inc, long tid, bool modify,
+ MOSDOp(int inc, long tid,
object_t oid, ceph_object_layout ol, epoch_t mapepoch,
int flags) :
Message(CEPH_MSG_OSD_OP) {
memset(&head, 0, sizeof(head));
head.tid = tid;
head.client_inc = inc;
- head.is_modify = modify;
head.oid = oid;
head.layout = ol;
head.osdmap_epoch = mapepoch;
__s32 get_result() { return head.result; }
eversion_t get_version() { return head.reassert_version; }
- bool is_modify() { return head.is_modify; }
+ bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; }
void set_result(int r) { head.result = r; }
void set_version(eversion_t v) { head.reassert_version = v; }
Message(CEPH_MSG_OSD_OPREPLY) {
memset(&head, 0, sizeof(head));
head.tid = req->head.tid;
- head.is_modify = req->is_modify();
ops = req->ops;
head.result = result;
- head.flags = commit ? CEPH_OSD_OP_SAFE:0;
+ head.flags =
+ (req->head.flags & ~(CEPH_OSD_OP_SAFE|CEPH_OSD_OP_ACK)) |
+ (commit ? CEPH_OSD_OP_SAFE:CEPH_OSD_OP_ACK);
head.oid = req->head.oid;
head.layout = req->head.layout;
head.osdmap_epoch = e;
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
+ MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
oid,
layout,
osd->osdmap->get_epoch(),
- 0);
+ CEPH_OSD_OP_MODIFY);
pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
do_op(pop);
}
ceph_object_layout layout;
layout.ol_pgid = info.pgid.u.pg64;
layout.ol_stripe_unit = 0;
- MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
+ MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
oid,
layout,
osd->osdmap->get_epoch(),
- 0);
+ CEPH_OSD_OP_MODIFY);
pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
do_op(pop);
}
object_t oid = op->get_oid();
pobject_t poid(info.pgid.pool(), 0, oid);
- ceph_osd_op& readop = op->ops[0];
-
- dout(10) << "op_read " << ceph_osd_op_name(readop.op)
- << " " << oid
- << " " << readop.offset << "~" << readop.length
- << dendl;
+ dout(10) << "op_read " << oid << " " << op->ops << dendl;
// wrlocked?
if (block_if_wrlocked(op))
return;
- // taking read shedding out for now, because i don't want to embed
- // the osd_peer_stat in MOSDOp
+
+ bufferlist data;
+ int data_off = 0;
+ int result = 0;
// !primary and unbalanced?
// (ignore ops forwarded from the primary)
}
}
- // set up reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
- long r = 0;
-
// do it.
if (poid.oid.snap && !pick_read_snap(poid)) {
// we have no revision for this request.
- r = -ENOENT;
+ result = -ENOENT;
goto done;
}
if (cur > op->get_inc_lock()) {
dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
<< " on " << poid << dendl;
- r = -EINCLOCKED;
+ result = -EINCLOCKED;
goto done;
}
}
-
- switch (readop.op) {
- case CEPH_OSD_OP_READ:
- {
- // read into a buffer
- bufferlist bl;
- r = osd->store->read(info.pgid.to_coll(), poid,
- readop.offset, readop.length,
- bl);
- reply->set_data(bl);
- reply->get_header().data_off = readop.offset;
- if (r >= 0)
- reply->ops[0].length = r;
- else
- reply->ops[0].length = 0;
- dout(10) << " read got " << r << " / " << readop.length << " bytes from obj " << oid << dendl;
- }
- osd->logger->inc("c_rd");
- osd->logger->inc("c_rdb", reply->ops[0].length);
- break;
-
- case CEPH_OSD_OP_STAT:
- {
- struct stat st;
- memset(&st, sizeof(st), 0);
- r = osd->store->stat(info.pgid.to_coll(), poid, &st);
- if (r >= 0)
- reply->ops[0].length = st.st_size;
+
+ for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+ switch (p->op) {
+ case CEPH_OSD_OP_READ:
+ {
+ // read into a buffer
+ bufferlist bl;
+ int r = osd->store->read(info.pgid.to_coll(), poid, p->offset, p->length, bl);
+ if (data.length() == 0)
+ data_off = p->offset;
+ data.claim(bl);
+ if (r >= 0)
+ p->length = r;
+ else {
+ result = r;
+ p->length = 0;
+ }
+ dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
+ }
+ osd->logger->inc("c_rd");
+ osd->logger->inc("c_rdb", p->length);
+ break;
+
+ case CEPH_OSD_OP_STAT:
+ {
+ struct stat st;
+ memset(&st, sizeof(st), 0);
+ int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+ if (r >= 0)
+ p->length = st.st_size;
+ else
+ result = r;
+ }
+ break;
+
+ case CEPH_OSD_OP_GREP:
+ {
+
+ }
+ break;
+
+ default:
+ dout(1) << "unrecognized osd op " << p->op
+ << " " << ceph_osd_op_name(p->op)
+ << dendl;
+ result = -EOPNOTSUPP;
+ assert(0); // for now
}
- break;
-
- default:
- assert(0);
}
-
done:
- if (r >= 0) {
- reply->set_result(0);
+ // reply
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true);
+ reply->set_data(data);
+ reply->get_header().data_off = data_off;
+ reply->set_result(result);
+ if (result >= 0) {
utime_t now = g_clock.now();
utime_t diff = now;
diff -= op->get_recv_stamp();
if (is_primary() &&
g_conf.osd_balance_reads)
stat_object_temp_rd[oid].hit(now); // hit temp.
-
- } else {
- reply->set_result(r); // error
}
-
- // send it
+
osd->messenger->send_message(reply, op->get_orig_source_inst());
delete op;
int flags = rd->flags;
if (rd->onfinish)
flags |= CEPH_OSD_OP_ACK;
- MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
+ MOSDOp *m = new MOSDOp(client_inc, last_tid,
rd->oid, rd->layout, osdmap->get_epoch(),
flags);
m->ops = rd->ops;
if (rd->pbl)
rd->pbl->claim(m->get_data());
- if (rd->psize) {
- ceph_osd_op& op = m->ops[0];
- *(rd->psize) = op.length;
- }
-
+ if (rd->psize)
+ for (vector<ceph_osd_op>::iterator p = m->ops.begin(); p != m->ops.end(); p++)
+ if (p->op == CEPH_OSD_OP_STAT)
+ *(rd->psize) = p->length;
+
// finish, clean up
Context *onfinish = rd->onfinish;
dout(7) << " " << bytes_read << " bytes " << dendl;
<< " osd" << pg.primary()
<< dendl;
if (pg.primary() >= 0) {
- MOSDOp *m = new MOSDOp(client_inc, wr->tid, true,
+ MOSDOp *m = new MOSDOp(client_inc, wr->tid,
wr->oid, wr->layout, osdmap->get_epoch(),
- flags);
+ flags | CEPH_OSD_OP_MODIFY);
m->ops = wr->ops;
m->set_snap_seq(wr->snapc.seq);
m->get_snaps() = wr->snapc.snaps;