]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
add message footer. kclient safely revokes pages away from sending ceph_msg
authorSage Weil <sage@newdream.net>
Mon, 28 Apr 2008 23:07:37 +0000 (16:07 -0700)
committerSage Weil <sage@newdream.net>
Mon, 28 Apr 2008 23:07:37 +0000 (16:07 -0700)
src/include/ceph_fs.h
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/osd_client.c
src/mds/Locker.cc
src/msg/SimpleMessenger.cc

index fe6220c16747c8a04f009c40901663a0a5fa5ef5..5402073482d1b0202b2764bce4edca7ac4aeb26d 100644 (file)
@@ -232,7 +232,7 @@ struct ceph_entity_inst {
 
 
 /*
- * message header
+ * message header, footer
  */
 struct ceph_msg_header {
        __le64 seq;    /* message seq# for this session */
@@ -243,6 +243,10 @@ struct ceph_msg_header {
        struct ceph_entity_inst src, dst;
 } __attribute__ ((packed));
 
+struct ceph_msg_footer {
+       __le32 aborted;
+       __le32 csum;
+} __attribute__ ((packed));
 
 /*
  * message types
index 0c2f31f0fffd9812b59999e258f3501fcc57e0bd..d3a62f3d47ee4314c09dfa65dcc6bba6bc934f71 100644 (file)
@@ -372,13 +372,24 @@ static int write_partial_msg_pages(struct ceph_connection *con,
             con->out_msg_pos.page_pos);
 
        while (con->out_msg_pos.page < con->out_msg->nr_pages) {
-               struct page *page = msg->pages[con->out_msg_pos.page];
-               void *kaddr = kmap(page);
+               struct page *page;
+               void *kaddr;
+               mutex_lock(&msg->page_mutex);
+               page = msg->pages[con->out_msg_pos.page];
+               if (page) 
+                       kaddr = kmap(page);
+               else {
+                       derr(0, "using zero page\n");
+                       kaddr = page_address(page);
+                       page = 0;
+               }
                kv.iov_base = kaddr + con->out_msg_pos.page_pos;
                kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
                                 (int)(data_len - con->out_msg_pos.data_pos));
                ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
-               kunmap(page);
+               if (page)
+                       kunmap(page);
+               mutex_unlock(&msg->page_mutex);
                if (ret <= 0)
                        goto out;
                con->out_msg_pos.data_pos += ret;
@@ -391,7 +402,6 @@ static int write_partial_msg_pages(struct ceph_connection *con,
 
        /* done */
        dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
-       con->out_msg = 0;
        ret = 1;
 out:
        return ret;
@@ -558,6 +568,7 @@ more:
        }
 
        /* kvec data queued? */
+more_kvec:
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
                if (ret == 0)
@@ -580,6 +591,19 @@ more:
                }
        }
 
+       /* msg footer */
+       if (con->out_msg) {
+               con->out_footer.aborted =
+                       cpu_to_le32(con->out_msg->pages_revoked);
+               con->out_kvec[0].iov_base = &con->out_footer;
+               con->out_kvec_bytes = con->out_kvec[0].iov_len = 
+                       sizeof(con->out_footer);
+               con->out_kvec_left = 1;
+               con->out_kvec_cur = con->out_kvec;
+               con->out_msg = 0;
+               goto more_kvec;
+       }
+
        /* anything else pending? */
        spin_lock(&con->out_queue_lock);
        if (!list_empty(&con->out_queue)) {
@@ -638,15 +662,13 @@ static int read_message_partial(struct ceph_connection *con)
        dout(20, "read_message_partial con %p msg %p\n", con, m);
 
        /* header */
-       while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
-               left = sizeof(struct ceph_msg_header) - con->in_base_pos;
+       while (con->in_base_pos < sizeof(m->hdr)) {
+               left = sizeof(m->hdr) - con->in_base_pos;
                ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
                                       left);
                if (ret <= 0)
                        return ret;
                con->in_base_pos += ret;
-               if (con->in_base_pos == sizeof(struct ceph_msg_header))
-                       break;
        }
 
        /* front */
@@ -669,7 +691,7 @@ static int read_message_partial(struct ceph_connection *con)
        data_len = le32_to_cpu(m->hdr.data_len);
        data_off = le32_to_cpu(m->hdr.data_off);
        if (data_len == 0)
-               goto done;
+               goto no_data;
        if (m->nr_pages == 0) {
                con->in_msg_pos.page = 0;
                con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
@@ -708,7 +730,23 @@ static int read_message_partial(struct ceph_connection *con)
                }
        }
 
-done:
+no_data:
+       /* footer */
+       dout(0, "reading footer, pos %d / %d\n", 
+            con->in_base_pos, sizeof(m->hdr) + sizeof(m->footer));
+       while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
+               left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
+               dout(0, "left %d\n", left);
+               ret = ceph_tcp_recvmsg(con->sock, &m->footer +
+                                      (con->in_base_pos - sizeof(m->hdr)),
+                                      left);
+               dout(0, "got %d\n", ret);
+               if (ret <= 0)
+                       return ret;
+               con->in_base_pos += ret;
+               dout(0, "new pos %d\n", con->in_base_pos);
+       }
+
        dout(20, "read_message_partial got msg %p\n", m);
 
        /* did i learn my ip? */
@@ -1219,6 +1257,13 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        INIT_LIST_HEAD(&msgr->con_accepting);
        INIT_RADIX_TREE(&msgr->con_tree, GFP_KERNEL);
 
+       msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+       if (!msgr->zero_page) {
+               kfree(msgr);
+               return ERR_PTR(-ENOMEM);
+       }
+       kmap(msgr->zero_page);
+
        /* pick listening address */
        if (myaddr) {
                msgr->inst.addr = *myaddr;
@@ -1262,6 +1307,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        /* stop listener */
        ceph_sock_release(msgr->listen_sock);
 
+       kunmap(msgr->zero_page);
+       __free_page(msgr->zero_page);
        kfree(msgr);
 }
 
@@ -1403,6 +1450,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        if (m == NULL)
                goto out;
        atomic_set(&m->nref, 1);
+       mutex_init(&m->page_mutex);
+
        m->hdr.type = cpu_to_le32(type);
        m->hdr.front_len = cpu_to_le32(front_len);
        m->hdr.data_len = cpu_to_le32(page_len);
@@ -1421,6 +1470,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        /* pages */
        m->nr_pages = calc_pages_for(page_off, page_len);
        m->pages = pages;
+       m->pages_revoked = 0;
 
        INIT_LIST_HEAD(&m->list_head);
        dout(20, "ceph_msg_new %p\n", m);
index e602a79e19085744f55a7e574a7e47adbeb9193d..c6bbae3a1ffe61d7726a04163058c880b73a3a15 100644 (file)
@@ -1,11 +1,12 @@
 #ifndef __FS_CEPH_MESSENGER_H
 #define __FS_CEPH_MESSENGER_H
 
-#include <linux/uio.h>
+#include <linux/mutex.h>
 #include <linux/net.h>
 #include <linux/radix-tree.h>
-#include <linux/workqueue.h>
+#include <linux/uio.h>
 #include <linux/version.h>
+#include <linux/workqueue.h>
 
 #include "ceph_fs.h"
 
@@ -44,12 +45,16 @@ struct ceph_messenger {
        struct list_head con_all;        /* all connections */
        struct list_head con_accepting;  /* accepting */
        struct radix_tree_root con_tree; /*  established */
+       struct page *zero_page;
 };
 
 struct ceph_msg {
        struct ceph_msg_header hdr;     /* header */
+       struct ceph_msg_footer footer;  /* footer */
        struct kvec front;              /* first bit of message */
+       struct mutex page_mutex;
        struct page **pages;            /* data payload.  NOT OWNER. */
+       int pages_revoked;              /* if true, pages revoked before sent */
        unsigned nr_pages;              /* size of page array */
        struct list_head list_head;
        atomic_t nref;
@@ -109,6 +114,7 @@ struct ceph_connection {
                *out_kvec_cur;
        int out_kvec_left;   /* kvec's left */
        int out_kvec_bytes;  /* bytes left */
+       struct ceph_msg_footer out_footer;
        struct ceph_msg *out_msg;
        struct ceph_msg_pos out_msg_pos;
 
index 5be83df74ef04c2bbd82b7d037c54f3e91e3ea3f..b0b8fa28d8d505942e3a37f385f273ebb796da4f 100644 (file)
@@ -449,7 +449,12 @@ int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
 
        unregister_request(osdc, req);
        if (rc < 0) {
-               printk(KERN_ERR "osdc do_request err %d, watch out\n", rc);
+               struct ceph_msg *msg = req->r_request;
+               printk(KERN_ERR "osdc do_request err %d on %p\n", rc, msg);
+               mutex_lock(&msg->page_mutex);
+               msg->pages_revoked = 1;
+               memset(&msg->pages, 0, sizeof(void *) * msg->nr_pages);
+               mutex_unlock(&msg->page_mutex);
                return rc;
        }
 
index 694d9c0e9393243f8fa17e17caa547b46197923b..db698f289273dbbbc6cec46e9d88051be46d4618 100644 (file)
@@ -1139,7 +1139,7 @@ int Locker::issue_client_lease(CDentry *dn, int client,
       (diri->get_client_cap_pending(client) & (CEPH_CAP_EXCL|CEPH_CAP_RDCACHE)) == 0 &&
       dn->lock.can_lease())
     mask |= CEPH_LOCK_DN;
-
+  mask |= CEPH_LOCK_DN;
   _issue_client_lease(dn, mask, pool, client, bl, now, session);
   return mask;
 }
index 37e682d432f0fbc60197cef74507cde55d4c982c..a27d43e126d75646b7684b7369eb068b11220cf0 100644 (file)
@@ -1647,15 +1647,26 @@ Message *Rank::Pipe::read_message()
       dout(20) << "reader got data tail " << left << dendl;
     }
   }
-  
-  // unmarshall message
-  dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " 
-           << env.src << dendl;
 
-  Message *m = decode_message(env, front, data);
+  // footer
+  ceph_msg_footer footer;
+  if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) 
+    return 0;
 
-  
-  return m;
+  dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
+  if (le32_to_cpu(footer.aborted)) {
+    dout(0) << "reader got " << front.length() << " + " << data.length()
+            << " byte message from " << env.src << ".. ABORTED" << dendl;
+    // MEH FIXME 
+    Message *m = new MGenericMessage(CEPH_MSG_PING);
+    env.type = cpu_to_le32(CEPH_MSG_PING);
+    m->set_env(env);
+    return m;
+  } 
+
+  dout(20) << "reader got " << front.length() << " + " << data.length()
+          << " byte message from " << env.src << dendl;
+  return decode_message(env, front, data);
 }
 
 
@@ -1780,7 +1791,7 @@ int Rank::Pipe::write_message(Message *m, ceph_msg_header *env,
             << " writing " << donow 
             << dendl;
     
-    if (msg.msg_iovlen >= IOV_MAX-1) {
+    if (msg.msg_iovlen >= IOV_MAX-2) {
       if (do_sendmsg(sd, &msg, msglen)) 
        return -1;      
       
@@ -1806,7 +1817,15 @@ int Rank::Pipe::write_message(Message *m, ceph_msg_header *env,
     }
   }
   assert(left == 0);
-  
+
+  // send footer
+  struct ceph_msg_footer f;
+  memset(&f, 0, sizeof(f));
+  msgvec[msg.msg_iovlen].iov_base = (void*)&f;
+  msgvec[msg.msg_iovlen].iov_len = sizeof(f);
+  msglen += sizeof(f);
+  msg.msg_iovlen++;
+
   // send
   if (do_sendmsg(sd, &msg, msglen)) 
     return -1;