]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: only do footer with data payloads
authorSage Weil <sage@newdream.net>
Tue, 29 Apr 2008 03:31:31 +0000 (20:31 -0700)
committerSage Weil <sage@newdream.net>
Tue, 29 Apr 2008 03:31:31 +0000 (20:31 -0700)
src/kernel/messenger.c
src/msg/SimpleMessenger.cc

index d3a62f3d47ee4314c09dfa65dcc6bba6bc934f71..16dd5bccd5ab1b592403cd1c69ccd6425e26cb08 100644 (file)
@@ -331,6 +331,7 @@ static int write_partial_kvec(struct ceph_connection *con)
 {
        int ret;
 
+       dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
        while (con->out_kvec_bytes > 0) {
                ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
                                       con->out_kvec_left, con->out_kvec_bytes);
@@ -367,8 +368,8 @@ static int write_partial_msg_pages(struct ceph_connection *con,
        int ret;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
 
-       dout(30, "write_partial_msg_pages %p on %d/%d offset %d\n",
-            con, con->out_msg_pos.page, con->out_msg->nr_pages,
+       dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n",
+            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
             con->out_msg_pos.page_pos);
 
        while (con->out_msg_pos.page < con->out_msg->nr_pages) {
@@ -402,6 +403,16 @@ static int write_partial_msg_pages(struct ceph_connection *con,
 
        /* done */
        dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
+
+       con->out_footer.aborted =
+               cpu_to_le32(con->out_msg->pages_revoked);
+       con->out_kvec[0].iov_base = &con->out_footer;
+       con->out_kvec_bytes = con->out_kvec[0].iov_len = 
+               sizeof(con->out_footer);
+       con->out_kvec_left = 1;
+       con->out_kvec_cur = con->out_kvec;
+       con->out_msg = 0;
+       
        ret = 1;
 out:
        return ret;
@@ -435,9 +446,10 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg = m;  /* FIXME: do we want to take a reference here? */
 
        /* encode header */
-       dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n",
+       dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
             m, le64_to_cpu(m->hdr.seq), le32_to_cpu(m->hdr.type),
-            le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len));
+            le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
+            m->nr_pages);
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front */
@@ -580,8 +592,10 @@ more_kvec:
        }
 
        /* msg pages? */
-       if (con->out_msg) {
+       if (con->out_msg && con->out_msg->nr_pages) {
                ret = write_partial_msg_pages(con, con->out_msg);
+               if (ret == 1)
+                       goto more_kvec;
                if (ret == 0)
                        goto done;
                if (ret < 0) {
@@ -590,19 +604,7 @@ more_kvec:
                        goto done;
                }
        }
-
-       /* msg footer */
-       if (con->out_msg) {
-               con->out_footer.aborted =
-                       cpu_to_le32(con->out_msg->pages_revoked);
-               con->out_kvec[0].iov_base = &con->out_footer;
-               con->out_kvec_bytes = con->out_kvec[0].iov_len = 
-                       sizeof(con->out_footer);
-               con->out_kvec_left = 1;
-               con->out_kvec_cur = con->out_kvec;
-               con->out_msg = 0;
-               goto more_kvec;
-       }
+       con->out_msg = 0; /* done with this message. */
 
        /* anything else pending? */
        spin_lock(&con->out_queue_lock);
@@ -730,7 +732,6 @@ static int read_message_partial(struct ceph_connection *con)
                }
        }
 
-no_data:
        /* footer */
        dout(0, "reading footer, pos %d / %d\n", 
             con->in_base_pos, sizeof(m->hdr) + sizeof(m->footer));
@@ -747,6 +748,7 @@ no_data:
                dout(0, "new pos %d\n", con->in_base_pos);
        }
 
+no_data:
        dout(20, "read_message_partial got msg %p\n", m);
 
        /* did i learn my ip? */
@@ -1423,8 +1425,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
             ceph_msg_type_name(le32_to_cpu(msg->hdr.type)),
             le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.data_len));
-       dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg,
-            le64_to_cpu(msg->hdr.seq), ENTITY_NAME(msg->hdr.dst.name), con);
+       dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p pgs %d\n", msg,
+            le64_to_cpu(msg->hdr.seq), ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
        list_add_tail(&msg->list_head, &con->out_queue);
        spin_unlock(&con->out_queue_lock);
 
@@ -1473,7 +1475,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->pages_revoked = 0;
 
        INIT_LIST_HEAD(&m->list_head);
-       dout(20, "ceph_msg_new %p\n", m);
+       dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, m->nr_pages);
        return m;
 
 out2:
index a27d43e126d75646b7684b7369eb068b11220cf0..981bb12f01ac024899098c45039bd886465ad5df 100644 (file)
@@ -1646,23 +1646,23 @@ Message *Rank::Pipe::read_message()
       data.push_back(bp);
       dout(20) << "reader got data tail " << left << dendl;
     }
-  }
 
-  // footer
-  ceph_msg_footer footer;
-  if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
-    return 0;
+    // footer
+    ceph_msg_footer footer;
+    if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
+      return 0;
 
-  dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
-  if (le32_to_cpu(footer.aborted)) {
-    dout(0) << "reader got " << front.length() << " + " << data.length()
-            << " byte message from " << env.src << ".. ABORTED" << dendl;
-    // MEH FIXME 
-    Message *m = new MGenericMessage(CEPH_MSG_PING);
-    env.type = cpu_to_le32(CEPH_MSG_PING);
-    m->set_env(env);
-    return m;
-  } 
+    dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
+    if (le32_to_cpu(footer.aborted)) {
+      dout(0) << "reader got " << front.length() << " + " << data.length()
+             << " byte message from " << env.src << ".. ABORTED" << dendl;
+      // MEH FIXME 
+      Message *m = new MGenericMessage(CEPH_MSG_PING);
+      env.type = cpu_to_le32(CEPH_MSG_PING);
+      m->set_env(env);
+      return m;
+    }
+  }
 
   dout(20) << "reader got " << front.length() << " + " << data.length()
           << " byte message from " << env.src << dendl;
@@ -1818,13 +1818,15 @@ int Rank::Pipe::write_message(Message *m, ceph_msg_header *env,
   }
   assert(left == 0);
 
-  // send footer
-  struct ceph_msg_footer f;
-  memset(&f, 0, sizeof(f));
-  msgvec[msg.msg_iovlen].iov_base = (void*)&f;
-  msgvec[msg.msg_iovlen].iov_len = sizeof(f);
-  msglen += sizeof(f);
-  msg.msg_iovlen++;
+  if (data.length()) {
+    // send data footer
+    struct ceph_msg_footer f;
+    memset(&f, 0, sizeof(f));
+    msgvec[msg.msg_iovlen].iov_base = (void*)&f;
+    msgvec[msg.msg_iovlen].iov_len = sizeof(f);
+    msglen += sizeof(f);
+    msg.msg_iovlen++;
+  }
 
   // send
   if (do_sendmsg(sd, &msg, msglen))