]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: post rx buffer to msgr if target bufferlist is present
authorSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:28:44 +0000 (20:28 -0800)
committerSage Weil <sage@newdream.net>
Mon, 15 Nov 2010 04:39:07 +0000 (20:39 -0800)
Signed-off-by: Sage Weil <sage@newdream.net>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 6eac0a52ca7218e902495b77704f57f9716d6d8d..3ae3b99b44b6c5373464ec4313372abda5371d46 100644 (file)
@@ -475,6 +475,20 @@ tid_t Objecter::op_submit(Op *op)
     if (op->onack)
       flags |= CEPH_OSD_FLAG_ACK;
 
+    if (op->con) {
+      if (op->outbl->length()) {
+       dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+       op->con->revoke_rx_buffer(op->tid);
+      }
+      op->con->put();
+    }
+    op->con = messenger->get_connection(osdmap->get_inst(pg.primary()));
+    assert(op->con);
+    if (op->outbl && op->outbl->length()) {
+      dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl;
+      op->con->post_rx_buffer(op->tid, *op->outbl);
+    }
+
     ceph_object_layout ol;
     ol.ol_pgid = op->pgid.v;
     ol.ol_stripe_unit = 0;
@@ -497,7 +511,7 @@ tid_t Objecter::op_submit(Op *op)
     if (op->priority)
       m->set_priority(op->priority);
 
-    messenger->send_message(m, osdmap->get_inst(pg.primary()));
+    messenger->send_message(m, op->con);
   } else 
     maybe_request_map();
   
@@ -558,18 +572,16 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
          << dendl;
   Op *op = op_osd[ tid ];
 
-  Context *onack = 0;
-  Context *oncommit = 0;
-
-  PG &pg = get_pg( m->get_pg() );
-
-  // ignore?
-  if (pg.acker() != m->get_source().num()) {
-    dout(7) << " ignoring ack|commit from non-acker" << dendl;
+  if (op->con != m->get_connection()) {
+    dout(7) << " ignoring reply from " << m->get_source_inst()
+           << ", i last sent to " << op->con->get_peer_addr() << dendl;
     m->put();
     return;
   }
-  
+
+  Context *onack = 0;
+  Context *oncommit = 0;
+
   int rc = m->get_result();
 
   if (rc == -EAGAIN) {
@@ -585,6 +597,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // got data?
   if (op->outbl) {
+    if (op->outbl->length())
+      op->con->revoke_rx_buffer(op->tid);
     m->claim_data(*op->outbl);
     op->outbl = 0;
   }
@@ -606,6 +620,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // done with this tid?
   if (!op->onack && !op->oncommit) {
+    PG &pg = get_pg( m->get_pg() );
     assert(pg.active_tids.count(tid));
     pg.active_tids.erase(tid);
     dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
@@ -614,6 +629,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       close_pg( m->get_pg() );
     put_op_budget(op);
     op_osd.erase( tid );
+    if (op->con)
+      op->con->put();
     delete op;
   }
   
index 5a82eb5f7229549994073ba9e45ef083eaa8cfc7..a0d4c9d90bba261460c0f1048c2cebaad616bd53 100644 (file)
@@ -225,6 +225,8 @@ public:
     object_locator_t oloc;
     pg_t pgid;
 
+    Connection *con;
+
     vector<OSDOp> ops;
 
     snapid_t snapid;
@@ -246,6 +248,7 @@ public:
        int f, Context *ac, Context *co) :
       session_item(this),
       oid(o), oloc(ol),
+      con(NULL),
       snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), 
       tid(0), attempts(0),
       paused(false) {
@@ -897,7 +900,6 @@ public:
   void ms_handle_connect(Connection *con);
   void ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
-
 };
 
 #endif