]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: factor out do_read_ops helper
authorSage Weil <sage@newdream.net>
Tue, 19 May 2009 17:29:12 +0000 (10:29 -0700)
committerSage Weil <sage@newdream.net>
Tue, 19 May 2009 17:29:12 +0000 (10:29 -0700)
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 7e93d19ba02dfb5f8d6c95d2c8f61139e9bc1b64..2adda1d08e800868738ced30d9b34b4349b89ff2 100644 (file)
@@ -640,92 +640,22 @@ int ReplicatedPG::pick_read_snap(sobject_t& soid, object_info_t& coi)
 } 
 
 
-void ReplicatedPG::op_read(MOSDOp *op)
+int ReplicatedPG::do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi,
+                             vector<ceph_osd_op> &ops, bufferlist::iterator& bp,
+                             bufferlist& data,
+                             int *data_off)
 {
-  object_t oid = op->get_oid();
-  sobject_t soid(oid, op->get_snapid());
-
-  dout(10) << "op_read " << soid << " " << op->ops << dendl;
-
-  bufferlist::iterator bp = op->get_data().begin();
-  bufferlist data;
-  int data_off = 0;
   int result = 0;
 
-  // pick revision
-  object_info_t oi(soid);
-  if (soid.snap) {
-    result = pick_read_snap(soid, oi);
-    if (result == -EAGAIN) {
-      wait_for_missing_object(soid, op);
-      return;
-    }
-    if (result != 0)
-      goto done;    // we have no revision for this request.
-  } 
-
-  // wrlocked?
-  if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
-      block_if_wrlocked(op, oi)) 
-    return;
-
-
-  // !primary and unbalanced?
-  //  (ignore ops forwarded from the primary)
-  if (!is_primary()) {
-    if (op->get_source().is_osd() &&
-       op->get_source().num() == get_primary()) {
-      // read was shed to me by the primary
-      int from = op->get_source().num();
-      assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
-      osd->take_peer_stat(from, op->get_peer_stat());
-      dout(10) << "read shed IN from " << op->get_source() 
-               << " " << op->get_reqid()
-               << ", me = " << osd->my_stat.read_latency_mine
-               << ", them = " << op->get_peer_stat().read_latency
-               << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
-               << dendl;
-      osd->logger->inc(l_osd_shdin);
-
-      // does it look like they were wrong to do so?
-      Mutex::Locker lock(osd->peer_stat_lock);
-      if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
-         osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
-       dout(-10) << "read shed IN from " << op->get_source() 
-                 << " " << op->get_reqid()
-                 << " and me " << osd->my_stat.read_latency_mine
-                 << " > them " << op->get_peer_stat().read_latency
-                 << ", but they didn't know better, sharing" << dendl;
-       osd->my_stat_on_peer[from] = osd->my_stat;
-       /*
-       osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
-                                                 osd->my_stat),
-                                    osd->osdmap->get_inst(from));
-       */
-      }
-    } else {
-      // make sure i exist and am balanced, otherwise fw back to acker.
-      bool b;
-      if (!osd->store->exists(info.pgid.to_coll(), soid) || 
-         osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) {
-       dout(-10) << "read on replica, object " << soid 
-                 << " dne or no balance-reads, fw back to primary" << dendl;
-       osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
-       return;
-      }
-    }
-  }
-
-  // do it.
-  for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+  for (vector<ceph_osd_op>::iterator p = ops.begin(); p != ops.end(); p++) {
     switch (p->op) {
     case CEPH_OSD_OP_READ:
       {
        // read into a buffer
        bufferlist bl;
        int r = osd->store->read(info.pgid.to_coll(), soid, p->offset, p->length, bl);
-       if (data.length() == 0)
-         data_off = p->offset;
+       if (data.length() == 0 && data_off)
+         *data_off = p->offset;
        data.claim(bl);
        if (r >= 0) 
          p->length = r;
@@ -733,12 +663,12 @@ void ReplicatedPG::op_read(MOSDOp *op)
          result = r;
          p->length = 0;
        }
-       dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
+       dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << soid << dendl;
       }
       osd->logger->inc(l_osd_c_rd);
       osd->logger->inc(l_osd_c_rdb, p->length);
       break;
-      
+
     case CEPH_OSD_OP_RDCALL:
       {
        string cname, mname;
@@ -794,18 +724,15 @@ void ReplicatedPG::op_read(MOSDOp *op)
          result = r;
       }
       break;
-      
+
     case CEPH_OSD_OP_GREP:
-      {
-       
-      }
       break;
-      
+
     case CEPH_OSD_OP_MASKTRUNC:
       if (p != op->ops.begin()) {
        ceph_osd_op& rd = *(p - 1);
        ceph_osd_op& m = *p;
-
+       
        // are we beyond truncate_size?
        if (rd.offset + rd.length > m.truncate_size) {  
          __u32 seq = 0;
@@ -815,22 +742,22 @@ void ReplicatedPG::op_read(MOSDOp *op)
            ::decode(seq, p);
            ::decode(tm, p);
          }
-
+         
          // truncated portion of the read
          unsigned from = MAX(rd.offset, m.truncate_size);  // also end of data
          unsigned to = rd.offset + rd.length;
-         unsigned trim = to-from;
-
+         unsigned trim = to-from;
+         
          rd.length = rd.length - trim;
-
+         
          dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
-
+         
          bufferlist keep;
          keep.substr_of(data, 0, data.length() - trim);
          bufferlist truncated;  // everthing after 'from'
          truncated.substr_of(data, data.length() - trim, trim);
          keep.swap(data);
-
+         
          if (seq == rd.truncate_seq) {
            // keep any valid extents beyond 'from'
            unsigned data_end = from;
@@ -852,7 +779,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
                  rd.length = rd.length + bp.length();
                  data_end += bp.length();
                }
-               
+
                bufferlist b;
                b.substr_of(truncated, s-from, l);
                dout(20) << "  adding " << b.length() << " bytes from " << s << "~" << l << dendl;
@@ -873,8 +800,91 @@ void ReplicatedPG::op_read(MOSDOp *op)
       result = -EOPNOTSUPP;
       assert(0);  // for now
     }
+    if (result)
+      break;
   }
-  
+  return result;
+}
+
+void ReplicatedPG::op_read(MOSDOp *op)
+{
+  object_t oid = op->get_oid();
+  sobject_t soid(oid, op->get_snapid());
+
+  dout(10) << "op_read " << soid << " " << op->ops << dendl;
+
+  bufferlist::iterator bp = op->get_data().begin();
+  bufferlist data;
+  int data_off = 0;
+  int result = 0;
+
+  // pick revision
+  object_info_t oi(soid);
+  if (soid.snap) {
+    result = pick_read_snap(soid, oi);
+    if (result == -EAGAIN) {
+      wait_for_missing_object(soid, op);
+      return;
+    }
+    if (result != 0)
+      goto done;    // we have no revision for this request.
+  } 
+
+  // wrlocked?
+  if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
+      block_if_wrlocked(op, oi)) 
+    return;
+
+
+  // !primary and unbalanced?
+  //  (ignore ops forwarded from the primary)
+  if (!is_primary()) {
+    if (op->get_source().is_osd() &&
+       op->get_source().num() == get_primary()) {
+      // read was shed to me by the primary
+      int from = op->get_source().num();
+      assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
+      osd->take_peer_stat(from, op->get_peer_stat());
+      dout(10) << "read shed IN from " << op->get_source() 
+               << " " << op->get_reqid()
+               << ", me = " << osd->my_stat.read_latency_mine
+               << ", them = " << op->get_peer_stat().read_latency
+               << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
+               << dendl;
+      osd->logger->inc(l_osd_shdin);
+
+      // does it look like they were wrong to do so?
+      Mutex::Locker lock(osd->peer_stat_lock);
+      if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
+         osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
+       dout(-10) << "read shed IN from " << op->get_source() 
+                 << " " << op->get_reqid()
+                 << " and me " << osd->my_stat.read_latency_mine
+                 << " > them " << op->get_peer_stat().read_latency
+                 << ", but they didn't know better, sharing" << dendl;
+       osd->my_stat_on_peer[from] = osd->my_stat;
+       /*
+       osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
+                                                 osd->my_stat),
+                                    osd->osdmap->get_inst(from));
+       */
+      }
+    } else {
+      // make sure i exist and am balanced, otherwise fw back to acker.
+      bool b;
+      if (!osd->store->exists(info.pgid.to_coll(), soid) || 
+         osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) < 0) {
+       dout(-10) << "read on replica, object " << soid 
+                 << " dne or no balance-reads, fw back to primary" << dendl;
+       osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
+       return;
+      }
+    }
+  }
+
+  // do it.
+  do_read_ops(op, soid, oi, op->ops, bp, data, &data_off);
+   
  done:
   // reply
   MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); 
index 58ebb3391ec9e3390bf2099b6e483e43d1316d60..0ddf1e05814f5503848a75b448402e8d4ef82ae1 100644 (file)
@@ -214,6 +214,11 @@ protected:
   void op_read(MOSDOp *op);
   void op_modify(MOSDOp *op);
 
+  int do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi,
+                vector<ceph_osd_op>& ops, bufferlist::iterator& bp,
+                bufferlist& data,
+                int *data_off);
+
   void sub_op_modify(MOSDSubOp *op);
   void sub_op_modify_reply(MOSDSubOpReply *reply);
   void sub_op_push(MOSDSubOp *op);