#define CEPH_OSD_PROTOCOL 5 /* cluster internal */
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
-#define CEPH_OSDC_PROTOCOL 11 /* public/client */
+#define CEPH_OSDC_PROTOCOL 12 /* public/client */
#define CEPH_MDSC_PROTOCOL 20 /* public/client */
#define CEPH_MONC_PROTOCOL 12 /* public/client */
/*
* osd op flags
+ *
+ * An op may be READ, WRITE, or READ|WRITE.
*/
enum {
CEPH_OSD_FLAG_ACK = 1, /* want (or is) "ack" ack */
CEPH_OSD_FLAG_ONNVRAM = 2, /* want (or is) "onnvram" ack */
CEPH_OSD_FLAG_ONDISK = 4, /* want (or is) "ondisk" ack */
CEPH_OSD_FLAG_RETRY = 8, /* resend attempt */
- CEPH_OSD_FLAG_MODIFY = 32, /* op is/was a mutation */
+ CEPH_OSD_FLAG_READ = 16, /* op may read */
+ CEPH_OSD_FLAG_WRITE = 32, /* op may write */
CEPH_OSD_FLAG_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */
CEPH_OSD_FLAG_PEERSTAT = 128, /* msg includes osd_peer_stat */
CEPH_OSD_FLAG_BALANCE_READS = 256,
ceph_vino(inode),
offset, &len,
CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_MODIFY |
+ CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, do_sync,
ci->i_truncate_seq,
flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
- CEPH_OSD_FLAG_MODIFY;
+ CEPH_OSD_FLAG_WRITE;
if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
flags |= CEPH_OSD_FLAG_ACK;
else
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- req->r_flags = flags & CEPH_OSD_FLAG_MODIFY;
+ req->r_flags = flags;
req->r_last_osd = -1;
+ WARN_ON((flags & (CEPH_OSD_OP_READ|CEPH_OSD_OP_WRITE)) == 0);
+
/* create message */
if (snapc)
msg_size += sizeof(u64) * snapc->num_snaps;
head->client_inc = cpu_to_le32(1); /* always, for now. */
head->flags = cpu_to_le32(flags);
- if (flags & CEPH_OSD_FLAG_MODIFY)
+ if (flags & CEPH_OSD_FLAG_WRITE)
ceph_encode_timespec(&head->mtime, mtime);
head->num_ops = cpu_to_le16(num_op);
op->op = cpu_to_le16(opcode);
calc_layout(osdc, vino, layout, off, plen, req);
req->r_file_layout = *layout; /* keep a copy */
- if (flags & CEPH_OSD_FLAG_MODIFY) {
+ if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off);
req->r_request->hdr.data_len = cpu_to_le32(*plen);
}
/* either this is a read, or we got the safe response */
if ((flags & CEPH_OSD_FLAG_ONDISK) ||
- ((flags & CEPH_OSD_FLAG_MODIFY) == 0))
+ ((flags & CEPH_OSD_FLAG_WRITE) == 0))
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
break;
next_tid = req->r_tid + 1;
- if ((req->r_flags & CEPH_OSD_FLAG_MODIFY) == 0)
+ if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
continue;
ceph_osdc_get_request(req);
dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, len);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_READ, 0, NULL, 0,
- truncate_seq, truncate_size, NULL);
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+ NULL, 0, truncate_seq, truncate_size, NULL);
if (IS_ERR(req))
return PTR_ERR(req);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
CEPH_OSD_OP_WRITE,
flags | CEPH_OSD_FLAG_ONDISK |
- CEPH_OSD_FLAG_MODIFY,
+ CEPH_OSD_FLAG_WRITE,
snapc, do_sync,
truncate_seq, truncate_size, mtime);
if (IS_ERR(req))
utime_t get_mtime() { return head.mtime; }
- bool is_modify() { return head.flags & CEPH_OSD_FLAG_MODIFY; }
+ bool may_read() { return head.flags & CEPH_OSD_FLAG_READ; }
+ bool may_write() { return head.flags & CEPH_OSD_FLAG_WRITE; }
void set_peer_stat(const osd_peer_stat_t& stat) {
peer_stat = stat;
void print(ostream& out) {
out << "osd_op(" << get_reqid();
out << " " << head.oid;
- if (!is_modify())
- out << " @" << snapid_t((__u64)head.snapid);
+
+ out << " ";
+ if (may_read())
+ out << "r";
+ if (may_write())
+ out << "w";
+ else
+ out << "@" << snapid_t((__u64)head.snapid);
+
out << " " << ops;
out << " " << pg_t(head.layout.ol_pgid);
if (is_retry_attempt()) out << " RETRY";
__s32 get_result() { return head.result; }
eversion_t get_version() { return head.reassert_version; }
- bool is_modify() { return head.flags & CEPH_OSD_FLAG_MODIFY; }
+ bool may_read() { return head.flags & CEPH_OSD_FLAG_READ; }
+ bool may_write() { return head.flags & CEPH_OSD_FLAG_WRITE; }
void set_result(int r) { head.result = r; }
void set_version(eversion_t v) { head.reassert_version = v; }
void print(ostream& out) {
out << "osd_op_reply(" << get_reqid()
<< " " << head.oid << " " << ops;
- if (is_modify()) {
+ if (may_write()) {
if (is_ondisk())
out << " ondisk";
else if (is_onnvram())
stat_oprate.hit(now);
stat_ops++;
stat_qlen += pending_ops;
- if (!op->is_modify()) {
+ if (!op->may_write()) {
stat_rd_ops++;
if (op->get_source().is_osd()) {
//derr(-10) << "shed in " << stat_rd_ops_shed_in << " / " << stat_rd_ops << dendl;
}
// pg must be same-ish...
- if (!op->is_modify()) {
+ if (!op->may_write()) {
// read
if (!pg->same_for_read_since(op->get_map_epoch())) {
dout(7) << "handle_rep_op pg changed " << pg->info.history
return;
}
- if (!op->is_modify()) {
+ if (!op->may_write()) {
Mutex::Locker lock(peer_stat_lock);
stat_rd_ops_in_queue++;
}
bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
{
// we only care about reads here on out..
- if (op->is_modify() ||
- op->ops.size() < 1 ||
- op->is_modify())
+ if (op->may_write() ||
+ op->ops.size() < 1)
return false;
+
ceph_osd_op& readop = op->ops[0];
object_t oid = op->get_oid();
osd->logger->inc(l_osd_op);
- if (op->is_modify())
+ if (op->may_write())
op_modify(op);
else
op_read(op);
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
- if (m->is_modify())
+ if (m->may_write())
handle_osd_modify_reply(m);
else
handle_osd_read_reply(m);
flags |= CEPH_OSD_FLAG_ACK;
MOSDOp *m = new MOSDOp(client_inc, last_tid,
rd->oid, rd->layout, osdmap->get_epoch(),
- flags);
+ flags | CEPH_OSD_FLAG_READ);
m->set_snapid(rd->snap);
m->ops = rd->ops;
m->set_data(rd->bl);
} else if (pg.primary() >= 0) {
MOSDOp *m = new MOSDOp(client_inc, wr->tid,
wr->oid, wr->layout, osdmap->get_epoch(),
- flags | CEPH_OSD_FLAG_MODIFY);
+ flags | CEPH_OSD_FLAG_WRITE);
m->ops = wr->ops;
m->set_mtime(wr->mtime);
m->set_snap_seq(wr->snapc.seq);