]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: expand footer flags, include nocrc, kclient nocrc mount option
authorSage Weil <sage@newdream.net>
Tue, 7 Oct 2008 19:32:40 +0000 (12:32 -0700)
committerSage Weil <sage@newdream.net>
Tue, 7 Oct 2008 19:32:40 +0000 (12:32 -0700)
src/include/ceph_fs.h
src/kernel/messenger.c
src/kernel/super.c
src/kernel/super.h
src/msg/Message.cc
src/msg/SimpleMessenger.cc

index bbbb3d798a117d9623abfd7d58b91bed09314911..03b4c591e57c68ee14a712586f6efb171a8f6c5e 100644 (file)
@@ -414,11 +414,14 @@ struct ceph_msg_header {
 } __attribute__ ((packed));
 
 struct ceph_msg_footer {
-       __le32 aborted;
+       __le32 flags;
        __le32 front_crc;
        __le32 data_crc;
 } __attribute__ ((packed));
 
+#define CEPH_MSG_FOOTER_ABORTED   (1<<0)
+#define CEPH_MSG_FOOTER_NOCRC     (1<<1)
+
 /*
  * message types
  */
index acbb9e7849e66ed6a28937ef026e8fd3d4aa39e6..74dc92f161a3a6d403d85fc68f8addddf8e3974b 100644 (file)
@@ -18,8 +18,6 @@ int ceph_debug_msgr;
 #include "super.h"
 
 
-#define CEPH_USE_SENDPAGE
-
 /* static tag bytes */
 static char tag_ready = CEPH_MSGR_TAG_READY;
 static char tag_reset = CEPH_MSGR_TAG_RESETSESSION;
@@ -676,9 +674,11 @@ out:
 static int write_partial_msg_pages(struct ceph_connection *con,
                                   struct ceph_msg *msg)
 {
-       struct kvec kv;
        int ret;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       struct ceph_client *client = con->msgr->parent;
+       int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
+       size_t len;
 
        dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n",
             con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
@@ -686,36 +686,41 @@ static int write_partial_msg_pages(struct ceph_connection *con,
 
        while (con->out_msg_pos.page < con->out_msg->nr_pages) {
                struct page *page = NULL;
-               void *kaddr;
+               void *kaddr = 0;
 
                mutex_lock(&msg->page_mutex);
                if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
-                       kaddr = kmap(page);
+                       if (crc)
+                               kaddr = kmap(page);
                } else {
                        /*dout(60, "using zero page\n");*/
-                       kaddr = page_address(con->msgr->zero_page);
+                       if (crc)
+                               kaddr = page_address(con->msgr->zero_page);
                }
-               kv.iov_base = kaddr + con->out_msg_pos.page_pos;
-               kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
-                                (int)(data_len - con->out_msg_pos.data_pos));
-               if (!con->out_msg_pos.did_page_crc) {
+               len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
+                         (int)(data_len - con->out_msg_pos.data_pos));
+               if (crc && !con->out_msg_pos.did_page_crc) {
+                       void *base = kaddr + con->out_msg_pos.page_pos;
+
                        con->out_msg->footer.data_crc =
                                crc32c_le(con->out_msg->footer.data_crc,
-                                         kv.iov_base, kv.iov_len);
+                                         base, len);
                        con->out_msg_pos.did_page_crc = 1;
                }
 
-#ifndef CEPH_USE_SENDPAGE
-               ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1);
-#else
                if (msg->pages)
-                       ret = kernel_sendpage(con->sock, page, con->out_msg_pos.page_pos, kv.iov_len, MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
+                       ret = kernel_sendpage(con->sock, page,
+                                             con->out_msg_pos.page_pos, len,
+                                             MSG_DONTWAIT | MSG_NOSIGNAL |
+                                             MSG_MORE);
                else
-                       ret = kernel_sendpage(con->sock, con->msgr->zero_page, con->out_msg_pos.page_pos, kv.iov_len, MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
-#endif
+                       ret = kernel_sendpage(con->sock, con->msgr->zero_page,
+                                             con->out_msg_pos.page_pos, len,
+                                             MSG_DONTWAIT | MSG_NOSIGNAL |
+                                             MSG_MORE);
 
-               if (msg->pages)
+               if (crc && msg->pages)
                        kunmap(page);
 
                mutex_unlock(&msg->page_mutex);
@@ -724,7 +729,7 @@ static int write_partial_msg_pages(struct ceph_connection *con,
                con->out_msg_pos.data_pos += ret;
                con->out_msg_pos.page_pos += ret;
 
-               if (ret == kv.iov_len) {
+               if (ret == len) {
                        con->out_msg_pos.page_pos = 0;
                        con->out_msg_pos.page++;
                        con->out_msg_pos.did_page_crc = 0;
@@ -735,6 +740,8 @@ static int write_partial_msg_pages(struct ceph_connection *con,
        dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
 
        /* queue up footer, too */
+       if (!crc)
+               con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        con->out_kvec[0].iov_base = &con->out_msg->footer;
        con->out_kvec_bytes = con->out_kvec[0].iov_len =
                sizeof(con->out_msg->footer);
@@ -803,7 +810,7 @@ static void prepare_write_message(struct ceph_connection *con)
        /* fill in crc (except data pages), footer */
        con->out_msg->hdr.crc = crc32c_le(0, (void *)&m->hdr,
                                          sizeof(m->hdr) - sizeof(m->hdr.crc));
-       con->out_msg->footer.aborted = 0;
+       con->out_msg->footer.flags = 0;
        con->out_msg->footer.front_crc = crc32c_le(0, m->front.iov_base,
                                                   m->front.iov_len);
        con->out_msg->footer.data_crc = 0;
@@ -1951,7 +1958,7 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
        /* revoke old message's pages */
        mutex_lock(&old->page_mutex);
        old->pages = 0;
-       old->footer.aborted = 1;
+       old->footer.flags |= CEPH_MSG_FOOTER_ABORTED;
        mutex_unlock(&old->page_mutex);
 
        ceph_msg_put(old);
index beb3f11cb971845442cc72600640a0aa1fff5923..83fe33d1eea4b698fa43a4e8461a9dd0a2ec3664 100644 (file)
@@ -385,6 +385,7 @@ enum {
        Opt_nodirstat,
        Opt_rbytes,
        Opt_norbytes,
+       Opt_nocrc,
 };
 
 static match_table_t arg_tokens = {
@@ -414,6 +415,7 @@ static match_table_t arg_tokens = {
        {Opt_nodirstat, "nodirstat"},
        {Opt_rbytes, "rbytes"},
        {Opt_norbytes, "norbytes"},
+       {Opt_nocrc, "nocrc"},
        {-1, NULL}
 };
 
@@ -627,6 +629,9 @@ static int parse_mount_args(int flags, char *options, const char *dev_name,
                case Opt_norbytes:
                        args->flags &= ~CEPH_MOUNT_RBYTES;
                        break;
+               case Opt_nocrc:
+                       args->flags |= CEPH_MOUNT_NOCRC;
+                       break;
 
                default:
                        BUG_ON(token);
index b5df58bd7816139f6982157d942b6f6082ae00f4..b39a8e85e75db1d2973f5a51c2c9686f44578353 100644 (file)
@@ -102,6 +102,7 @@ static inline unsigned long time_sub(unsigned long a, unsigned long b)
 #define CEPH_MOUNT_UNSAFE_WRITEBACK (1<<3)
 #define CEPH_MOUNT_DIRSTAT       (1<<4)
 #define CEPH_MOUNT_RBYTES        (1<<5)
+#define CEPH_MOUNT_NOCRC         (1<<6)
 
 #define CEPH_MOUNT_DEFAULT   (CEPH_MOUNT_RBYTES)
 
index e1a0fa22257bcb0910436207646c674b0e0d13a7..1d909aa40f50ea2995c26c6d9599aa62ed806bff 100644 (file)
@@ -126,7 +126,8 @@ decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
     dout(0) << "bad crc in front " << front_crc << " != " << footer.front_crc << dendl;
     return 0;
   }
-  if (data_crc != footer.data_crc) {
+  if (data_crc != footer.data_crc &&
+      !(footer.flags & CEPH_MSG_FOOTER_NOCRC)) {
     dout(0) << "bad crc in data " << data_crc << " != " << footer.data_crc << dendl;
     return 0;
   }
index ae5ad527472f6b7eb0fcb3b8178458efedb8b6c0..57a56d372eddaf08f98012e9c8bfad452f241f93 100644 (file)
@@ -1781,8 +1781,9 @@ Message *Rank::Pipe::read_message()
   if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
     return 0;
   
-  dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
-  if (le32_to_cpu(footer.aborted)) {
+  int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
+  dout(10) << "aborted = " << aborted << dendl;
+  if (aborted) {
     dout(0) << "reader got " << front.length() << " + " << data.length()
            << " byte message from " << header.src << ".. ABORTED" << dendl;
     // MEH FIXME 
@@ -1877,7 +1878,7 @@ int Rank::Pipe::write_message(Message *m)
   // get envelope, buffers
   header.front_len = m->get_payload().length();
   header.data_len = m->get_data().length();
-  footer.aborted = 0;
+  footer.flags = 0;
   m->calc_header_crc();
 
   bufferlist blist = m->get_payload();