From 8ec32b2dfe0f0bd6a28775ca6851930f5229760b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 19 May 2009 10:29:12 -0700 Subject: [PATCH] osd: factor out do_read_ops helper --- src/osd/ReplicatedPG.cc | 196 +++++++++++++++++++++------------------- src/osd/ReplicatedPG.h | 5 + 2 files changed, 108 insertions(+), 93 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 7e93d19ba02df..2adda1d08e800 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -640,92 +640,22 @@ int ReplicatedPG::pick_read_snap(sobject_t& soid, object_info_t& coi) } -void ReplicatedPG::op_read(MOSDOp *op) +int ReplicatedPG::do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi, + vector &ops, bufferlist::iterator& bp, + bufferlist& data, + int *data_off) { - object_t oid = op->get_oid(); - sobject_t soid(oid, op->get_snapid()); - - dout(10) << "op_read " << soid << " " << op->ops << dendl; - - bufferlist::iterator bp = op->get_data().begin(); - bufferlist data; - int data_off = 0; int result = 0; - // pick revision - object_info_t oi(soid); - if (soid.snap) { - result = pick_read_snap(soid, oi); - if (result == -EAGAIN) { - wait_for_missing_object(soid, op); - return; - } - if (result != 0) - goto done; // we have no revision for this request. - } - - // wrlocked? - if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) && - block_if_wrlocked(op, oi)) - return; - - - // !primary and unbalanced? - // (ignore ops forwarded from the primary) - if (!is_primary()) { - if (op->get_source().is_osd() && - op->get_source().num() == get_primary()) { - // read was shed to me by the primary - int from = op->get_source().num(); - assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT); - osd->take_peer_stat(from, op->get_peer_stat()); - dout(10) << "read shed IN from " << op->get_source() - << " " << op->get_reqid() - << ", me = " << osd->my_stat.read_latency_mine - << ", them = " << op->get_peer_stat().read_latency - << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"") - << dendl; - osd->logger->inc(l_osd_shdin); - - // does it look like they were wrong to do so? - Mutex::Locker lock(osd->peer_stat_lock); - if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency && - osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) { - dout(-10) << "read shed IN from " << op->get_source() - << " " << op->get_reqid() - << " and me " << osd->my_stat.read_latency_mine - << " > them " << op->get_peer_stat().read_latency - << ", but they didn't know better, sharing" << dendl; - osd->my_stat_on_peer[from] = osd->my_stat; - /* - osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(), - osd->my_stat), - osd->osdmap->get_inst(from)); - */ - } - } else { - // make sure i exist and am balanced, otherwise fw back to acker. - bool b; - if (!osd->store->exists(info.pgid.to_coll(), soid) || - osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) { - dout(-10) << "read on replica, object " << soid - << " dne or no balance-reads, fw back to primary" << dendl; - osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary())); - return; - } - } - } - - // do it. - for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { + for (vector::iterator p = ops.begin(); p != ops.end(); p++) { switch (p->op) { case CEPH_OSD_OP_READ: { // read into a buffer bufferlist bl; int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl); - if (data.length() == 0) - data_off = p->offset; + if (data.length() == 0 && data_off) + *data_off = p->offset; data.claim(bl); if (r >= 0) p->length = r; @@ -733,12 +663,12 @@ void ReplicatedPG::op_read(MOSDOp *op) result = r; p->length = 0; } - dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl; + dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << soid << dendl; } osd->logger->inc(l_osd_c_rd); osd->logger->inc(l_osd_c_rdb, p->length); break; - + case CEPH_OSD_OP_RDCALL: { string cname, mname; @@ -794,18 +724,15 @@ void ReplicatedPG::op_read(MOSDOp *op) result = r; } break; - + case CEPH_OSD_OP_GREP: - { - - } break; - + case CEPH_OSD_OP_MASKTRUNC: if (p != op->ops.begin()) { ceph_osd_op& rd = *(p - 1); ceph_osd_op& m = *p; - + // are we beyond truncate_size? if (rd.offset + rd.length > m.truncate_size) { __u32 seq = 0; @@ -815,22 +742,22 @@ void ReplicatedPG::op_read(MOSDOp *op) ::decode(seq, p); ::decode(tm, p); } - + // truncated portion of the read unsigned from = MAX(rd.offset, m.truncate_size); // also end of data unsigned to = rd.offset + rd.length; - unsigned trim = to-from; - + unsigned trim = to-from; + rd.length = rd.length - trim; - + dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl; - + bufferlist keep; keep.substr_of(data, 0, data.length() - trim); bufferlist truncated; // everthing after 'from' truncated.substr_of(data, data.length() - trim, trim); keep.swap(data); - + if (seq == rd.truncate_seq) { // keep any valid extents beyond 'from' unsigned data_end = from; @@ -852,7 +779,7 @@ void ReplicatedPG::op_read(MOSDOp *op) rd.length = rd.length + bp.length(); data_end += bp.length(); } - + bufferlist b; b.substr_of(truncated, s-from, l); dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl; @@ -873,8 +800,91 @@ void ReplicatedPG::op_read(MOSDOp *op) result = -EOPNOTSUPP; assert(0); // for now } + if (result) + break; } - + return result; +} + +void ReplicatedPG::op_read(MOSDOp *op) +{ + object_t oid = op->get_oid(); + sobject_t soid(oid, op->get_snapid()); + + dout(10) << "op_read " << soid << " " << op->ops << dendl; + + bufferlist::iterator bp = op->get_data().begin(); + bufferlist data; + int data_off = 0; + int result = 0; + + // pick revision + object_info_t oi(soid); + if (soid.snap) { + result = pick_read_snap(soid, oi); + if (result == -EAGAIN) { + wait_for_missing_object(soid, op); + return; + } + if (result != 0) + goto done; // we have no revision for this request. + } + + // wrlocked? + if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) && + block_if_wrlocked(op, oi)) + return; + + + // !primary and unbalanced? + // (ignore ops forwarded from the primary) + if (!is_primary()) { + if (op->get_source().is_osd() && + op->get_source().num() == get_primary()) { + // read was shed to me by the primary + int from = op->get_source().num(); + assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT); + osd->take_peer_stat(from, op->get_peer_stat()); + dout(10) << "read shed IN from " << op->get_source() + << " " << op->get_reqid() + << ", me = " << osd->my_stat.read_latency_mine + << ", them = " << op->get_peer_stat().read_latency + << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"") + << dendl; + osd->logger->inc(l_osd_shdin); + + // does it look like they were wrong to do so? + Mutex::Locker lock(osd->peer_stat_lock); + if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency && + osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) { + dout(-10) << "read shed IN from " << op->get_source() + << " " << op->get_reqid() + << " and me " << osd->my_stat.read_latency_mine + << " > them " << op->get_peer_stat().read_latency + << ", but they didn't know better, sharing" << dendl; + osd->my_stat_on_peer[from] = osd->my_stat; + /* + osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(), + osd->my_stat), + osd->osdmap->get_inst(from)); + */ + } + } else { + // make sure i exist and am balanced, otherwise fw back to acker. + bool b; + if (!osd->store->exists(info.pgid.to_coll(), soid) || + osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) { + dout(-10) << "read on replica, object " << soid + << " dne or no balance-reads, fw back to primary" << dendl; + osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary())); + return; + } + } + } + + // do it. + do_read_ops(op, soid, oi, op->ops, bp, data, &data_off); + done: // reply MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 58ebb3391ec9e..0ddf1e05814f5 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -214,6 +214,11 @@ protected: void op_read(MOSDOp *op); void op_modify(MOSDOp *op); + int do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi, + vector& ops, bufferlist::iterator& bp, + bufferlist& data, + int *data_off); + void sub_op_modify(MOSDSubOp *op); void sub_op_modify_reply(MOSDSubOpReply *reply); void sub_op_push(MOSDSubOp *op); -- 2.39.5