}
-void ReplicatedPG::op_read(MOSDOp *op)
+int ReplicatedPG::do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi,
+ vector<ceph_osd_op> &ops, bufferlist::iterator& bp,
+ bufferlist& data,
+ int *data_off)
{
- object_t oid = op->get_oid();
- sobject_t soid(oid, op->get_snapid());
-
- dout(10) << "op_read " << soid << " " << op->ops << dendl;
-
- bufferlist::iterator bp = op->get_data().begin();
- bufferlist data;
- int data_off = 0;
int result = 0;
- // pick revision
- object_info_t oi(soid);
- if (soid.snap) {
- result = pick_read_snap(soid, oi);
- if (result == -EAGAIN) {
- wait_for_missing_object(soid, op);
- return;
- }
- if (result != 0)
- goto done; // we have no revision for this request.
- }
-
- // wrlocked?
- if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
- block_if_wrlocked(op, oi))
- return;
-
-
- // !primary and unbalanced?
- // (ignore ops forwarded from the primary)
- if (!is_primary()) {
- if (op->get_source().is_osd() &&
- op->get_source().num() == get_primary()) {
- // read was shed to me by the primary
- int from = op->get_source().num();
- assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
- osd->take_peer_stat(from, op->get_peer_stat());
- dout(10) << "read shed IN from " << op->get_source()
- << " " << op->get_reqid()
- << ", me = " << osd->my_stat.read_latency_mine
- << ", them = " << op->get_peer_stat().read_latency
- << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
- << dendl;
- osd->logger->inc(l_osd_shdin);
-
- // does it look like they were wrong to do so?
- Mutex::Locker lock(osd->peer_stat_lock);
- if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
- osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
- dout(-10) << "read shed IN from " << op->get_source()
- << " " << op->get_reqid()
- << " and me " << osd->my_stat.read_latency_mine
- << " > them " << op->get_peer_stat().read_latency
- << ", but they didn't know better, sharing" << dendl;
- osd->my_stat_on_peer[from] = osd->my_stat;
- /*
- osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
- osd->my_stat),
- osd->osdmap->get_inst(from));
- */
- }
- } else {
- // make sure i exist and am balanced, otherwise fw back to acker.
- bool b;
- if (!osd->store->exists(info.pgid.to_coll(), soid) ||
- osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) {
- dout(-10) << "read on replica, object " << soid
- << " dne or no balance-reads, fw back to primary" << dendl;
- osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
- return;
- }
- }
- }
-
- // do it.
- for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+ for (vector<ceph_osd_op>::iterator p = ops.begin(); p != ops.end(); p++) {
switch (p->op) {
case CEPH_OSD_OP_READ:
{
// read into a buffer
bufferlist bl;
int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl);
- if (data.length() == 0)
- data_off = p->offset;
+ if (data.length() == 0 && data_off)
+ *data_off = p->offset;
data.claim(bl);
if (r >= 0)
p->length = r;
result = r;
p->length = 0;
}
- dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
+ dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << soid << dendl;
}
osd->logger->inc(l_osd_c_rd);
osd->logger->inc(l_osd_c_rdb, p->length);
break;
-
+
case CEPH_OSD_OP_RDCALL:
{
string cname, mname;
result = r;
}
break;
-
+
case CEPH_OSD_OP_GREP:
- {
-
- }
break;
-
+
case CEPH_OSD_OP_MASKTRUNC:
if (p != op->ops.begin()) {
ceph_osd_op& rd = *(p - 1);
ceph_osd_op& m = *p;
-
+
// are we beyond truncate_size?
if (rd.offset + rd.length > m.truncate_size) {
__u32 seq = 0;
::decode(seq, p);
::decode(tm, p);
}
-
+
// truncated portion of the read
unsigned from = MAX(rd.offset, m.truncate_size); // also end of data
unsigned to = rd.offset + rd.length;
- unsigned trim = to-from;
-
+ unsigned trim = to-from;
+
rd.length = rd.length - trim;
-
+
dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
-
+
bufferlist keep;
keep.substr_of(data, 0, data.length() - trim);
bufferlist truncated; // everthing after 'from'
truncated.substr_of(data, data.length() - trim, trim);
keep.swap(data);
-
+
if (seq == rd.truncate_seq) {
// keep any valid extents beyond 'from'
unsigned data_end = from;
rd.length = rd.length + bp.length();
data_end += bp.length();
}
-
+
bufferlist b;
b.substr_of(truncated, s-from, l);
dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl;
result = -EOPNOTSUPP;
assert(0); // for now
}
+ if (result)
+ break;
}
-
+ return result;
+}
+
+void ReplicatedPG::op_read(MOSDOp *op)
+{
+ object_t oid = op->get_oid();
+ sobject_t soid(oid, op->get_snapid());
+
+ dout(10) << "op_read " << soid << " " << op->ops << dendl;
+
+ bufferlist::iterator bp = op->get_data().begin();
+ bufferlist data;
+ int data_off = 0;
+ int result = 0;
+
+ // pick revision
+ object_info_t oi(soid);
+ if (soid.snap) {
+ result = pick_read_snap(soid, oi);
+ if (result == -EAGAIN) {
+ wait_for_missing_object(soid, op);
+ return;
+ }
+ if (result != 0)
+ goto done; // we have no revision for this request.
+ }
+
+ // wrlocked?
+ if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
+ block_if_wrlocked(op, oi))
+ return;
+
+
+ // !primary and unbalanced?
+ // (ignore ops forwarded from the primary)
+ if (!is_primary()) {
+ if (op->get_source().is_osd() &&
+ op->get_source().num() == get_primary()) {
+ // read was shed to me by the primary
+ int from = op->get_source().num();
+ assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
+ osd->take_peer_stat(from, op->get_peer_stat());
+ dout(10) << "read shed IN from " << op->get_source()
+ << " " << op->get_reqid()
+ << ", me = " << osd->my_stat.read_latency_mine
+ << ", them = " << op->get_peer_stat().read_latency
+ << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
+ << dendl;
+ osd->logger->inc(l_osd_shdin);
+
+ // does it look like they were wrong to do so?
+ Mutex::Locker lock(osd->peer_stat_lock);
+ if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
+ osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
+ dout(-10) << "read shed IN from " << op->get_source()
+ << " " << op->get_reqid()
+ << " and me " << osd->my_stat.read_latency_mine
+ << " > them " << op->get_peer_stat().read_latency
+ << ", but they didn't know better, sharing" << dendl;
+ osd->my_stat_on_peer[from] = osd->my_stat;
+ /*
+ osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
+ osd->my_stat),
+ osd->osdmap->get_inst(from));
+ */
+ }
+ } else {
+ // make sure i exist and am balanced, otherwise fw back to acker.
+ bool b;
+ if (!osd->store->exists(info.pgid.to_coll(), soid) ||
+ osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) {
+ dout(-10) << "read on replica, object " << soid
+ << " dne or no balance-reads, fw back to primary" << dendl;
+ osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
+ return;
+ }
+ }
+ }
+
+ // do it.
+ do_read_ops(op, soid, oi, op->ops, bp, data, &data_off);
+
done:
// reply
MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);