]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
no more bufferlist; streamlined ceph_msg object instead
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Nov 2007 05:27:33 +0000 (05:27 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Nov 2007 05:27:33 +0000 (05:27 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2070 29311d96-e01e-0410-9327-a35deaab8ce9

12 files changed:
trunk/ceph/kernel/client.c
trunk/ceph/kernel/client.h
trunk/ceph/kernel/mds_client.c
trunk/ceph/kernel/mds_client.h
trunk/ceph/kernel/mdsmap.c
trunk/ceph/kernel/mdsmap.h
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/messenger.h
trunk/ceph/kernel/mon_client.h
trunk/ceph/kernel/monmap.h
trunk/ceph/kernel/osd_client.h
trunk/ceph/kernel/super.c

index 89538a19695df7c4dd89fcca54b03eff9bfb0680..fb9dfdf7b2f475d4612d25cdb36da20deed942d0 100644 (file)
@@ -39,7 +39,7 @@ static int mount(struct ceph_client *client, struct ceph_mount_args *args)
        int ret;
        int attempts = 10;
        
-       atomic_set(&client->mounting, 1);
+       client->mounting = 7;
 
        /* send mount request */
        mount_msg = ceph_new_message(CEPH_MSG_CLIENT_MOUNT, 0);
@@ -50,13 +50,13 @@ trymount:
        inst.name.type = CEPH_ENTITY_TYPE_MON;
        inst.name.num = get_random_int() % args->num_mon;
        inst.addr = args->mon_addr[inst.name.num];
-       dout(1, "ceph_get_client requesting mount from mon%d, %d attempts left\n", 
+       dout(1, "mount from mon%d, %d attempts left\n", 
             inst.name.num, attempts);
        ceph_messenger_send(client->msgr, mount_msg, &inst);
 
        /* wait */
        err = wait_event_interruptible_timeout(client->mounted_wq, 
-                                              atomic_read(&client->mounting) == 0,
+                                              client->mounting == 0,
                                               6*HZ);
        if (err == -EINTR)
                return err; 
@@ -85,9 +85,13 @@ static void handle_mon_map(struct ceph_client *client, struct ceph_message *msg)
        if (err != 0) 
                return;
        
-       /* mounted! */
-       client->whoami = msg->dst.name.num;
-       if (atomic_dec_and_test(&client->mounting))
+       if (client->whoami < 0) {
+               client->whoami = msg->dst.name.num;
+               client->msgr->inst.name = msg->dst.name;
+       }
+
+       clear_bit(4, &client->mounting);
+       if (client->mounting == 0)
                wake_up(&client->mount_wq);
 }
 
index 40a850b43b9146b0bafe78da5f9f8b48555cdba8..0e6c62bff3d637eea3fd239e1d94aae59225bc42 100644 (file)
@@ -39,7 +39,7 @@ struct ceph_client {
        struct ceph_fsid fsid;
        atomic_t nref;
 
-       atomic_t mounting;
+       int mounting;   /* map bitset; 4=mon, 2=mds, 1=osd map */
        wait_queue_t mount_wq;
 
        struct ceph_messenger *msgr;   /* messenger instance */
index 06dfe57dfcab72239eef59b31068197400f0cbe0..9784edd972c0ab8050e35e39d9eb29b5b8587855 100644 (file)
@@ -6,12 +6,12 @@
 #include "messenger.h"
 
 
-static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
 {
        msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
        msg->hdr.dst.name.type = CEPH_ENTITY_TYPE_MDS;
        msg->hdr.dst.name.num = mds;
-       ceph_messenger_send(mdsc->client->msgr, msg);
+       ceph_msg_send(mdsc->client->msgr, msg);
 }
 
 
@@ -26,7 +26,7 @@ static void get_request(struct ceph_mds_request *req)
 static void put_request(struct ceph_mds_request *req)
 {
        if (atomic_dec_and_test(&req->r_ref)) {
-               ceph_put_msg(req->r_request);
+               ceph_msg_put(req->r_request);
                kfree(req);
        } 
 }
@@ -36,14 +36,14 @@ static void put_request(struct ceph_mds_request *req)
  * register an in-flight request
  */
 static struct ceph_mds_request *
-register_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
 {
        struct ceph_mds_request *req;
 
        req = kmalloc(sizeof(*req), GFP_KERNEL);
 
        req->r_request = msg;
-       ceph_get_msg(msg);  /* grab reference */
+       ceph_msg_get(msg);  /* grab reference */
        req->r_reply = 0;
        req->r_num_mds = 0;
        req->r_attempts = 0;
@@ -116,23 +116,26 @@ static void unregister_session(struct ceph_mds_client *mdsc, int mds)
        mdsc->sessions[mds] = 0;
 }
 
-static struct ceph_message *create_session_msg(__u32 op, __u64 seq)
+static struct ceph_msg *create_session_msg(__u32 op, __u64 seq)
 {
-       struct ceph_message *msg;
+       struct ceph_msg *msg;
+       void *p;
 
-       msg = ceph_new_message(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64));
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(__u32)+sizeof(__u64), 0, 0);
        if (IS_ERR(msg))
                return ERR_PTR(-ENOMEM);  /* fixme */
-       op = cpu_to_le32(op);
-       ceph_bl_append_copy(&msg->payload, &op, sizeof(op));
-       seq = cpu_to_le64(op);
-       ceph_bl_append_copy(&msg->payload, &seq, sizeof(seq));
+       p = msg->front.iov_base;
+       *(__le32*)p = cpu_to_le32(op);
+       p += sizeof(__le32);
+       *(__le64*)p = cpu_to_le64(seq);
+       p += sizeof(__le64);
+
        return msg;
 }
 
 static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int mds)
 {
-       struct ceph_message *msg;
+       struct ceph_msg *msg;
 
        /* connect */
        if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
@@ -147,19 +150,20 @@ static void open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *
        send_msg_mds(mdsc, msg, mds);
 }
 
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        __u32 op;
        __u64 seq;
        int err;
        struct ceph_mds_session *session;
-       struct ceph_bufferlist_iterator bli = {0, 0};
        int from = msg->hdr.src.name.num;
-       
+       void *p = msg->front.iov_base;
+       void *end = msg->front.iov_base + msg->front.iov_len;
+
        /* decode */
-       if ((err = ceph_bl_decode_32(&msg->payload, &bli, &op)) != 0)
+       if ((err = ceph_decode_32(&p, end, &op)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_64(&msg->payload, &bli, &seq)) != 0)
+       if ((err = ceph_decode_64(&p, end, &seq)) != 0)
                goto bad;
        
        /* handle */
@@ -194,7 +198,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_message
        spin_unlock(&mdsc->lock);
 
 out:
-       ceph_put_msg(msg);
+       ceph_msg_put(msg);
        return;
        
 bad:
@@ -225,12 +229,12 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
 }
 
 
-struct ceph_message *
-ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds)
+struct ceph_msg *
+ceph_mdsc_make_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
 {
        struct ceph_mds_request *req;
        struct ceph_mds_session *session;
-       struct ceph_message *reply = 0;
+       struct ceph_msg *reply = 0;
 
        spin_lock(&mdsc->lock);
        req = register_request(mdsc, msg, mds);
@@ -289,7 +293,7 @@ retry:
 }
 
 
-void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        struct ceph_mds_request *req;
        __u64 tid;
@@ -314,21 +318,22 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *m
        put_request(req);
 }
 
-void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg)
+void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        struct ceph_mds_request *req;
        __u64 tid;
        __u32 next_mds;
        __u32 fwd_seq;
        int err;
-       struct ceph_bufferlist_iterator bli = {0, 0};
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
        
        /* decode */
-       if ((err = ceph_bl_decode_64(&msg->payload, &bli, &tid)) != 0)
+       if ((err = ceph_decode_64(&p, end, &tid)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(&msg->payload, &bli, &next_mds)) != 0)
+       if ((err = ceph_decode_32(&p, end, &next_mds)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(&msg->payload, &bli, &fwd_seq)) != 0)
+       if ((err = ceph_decode_32(&p, end, &fwd_seq)) != 0)
                goto bad;
 
        /* handle */
@@ -365,7 +370,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message
        put_request(req);
 
 out:
-       ceph_put_msg(msg);
+       ceph_msg_put(msg);
        return;
 
 bad:
@@ -374,25 +379,24 @@ bad:
 }
 
 
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, 
-                         struct ceph_message *msg)
+void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
-       struct ceph_bufferlist_iterator bli;
        __u64 epoch;
        __u32 left;
        int err;
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
 
-       ceph_bl_iterator_init(&bli);
-       if ((err = ceph_bl_decode_64(&msg->payload, &bli, &epoch)) != 0)
+       if ((err = ceph_decode_64(&p, end, &epoch)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(&msg->payload, &bli, &left)) != 0)
+       if ((err = ceph_decode_32(&p, end, &left)) != 0)
                goto bad;
 
        dout(2, "ceph_mdsc_handle_map epoch %llu\n", epoch);
 
        spin_lock(&mdsc->lock);
        if (epoch > mdsc->mdsmap->m_epoch) {
-               ceph_mdsmap_decode(mdsc->mdsmap, &msg->payload, &bli);
+               ceph_mdsmap_decode(mdsc->mdsmap, &p, end);
                spin_unlock(&mdsc->lock);
                complete(&mdsc->map_waiters);
        } else {
@@ -400,7 +404,7 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
        }
 
 out:
-       ceph_put_msg(msg);
+       ceph_msg_put(msg);
        return;
 bad:
        dout(1, "corrupt map\n");
index f23c1814b16d6bc3c70bf95f1d6a49a8698d6a5d..67f3e9188b739ee6a66b7c4fa0286f709db68e4d 100644 (file)
@@ -29,8 +29,8 @@ struct ceph_mds_session {
 
 struct ceph_mds_request {
        __u64 r_tid;
-       struct ceph_message *r_request;
-       struct ceph_message *r_reply;
+       struct ceph_msg *r_request;
+       struct ceph_msg *r_reply;
        
        __u32 r_mds[4];      /* set of mds's with whom request may be outstanding */
         int r_num_mds;       /* items in r_mds */
@@ -63,9 +63,9 @@ struct ceph_mds_client {
 
 extern void ceph_mdsc_init(struct ceph_mds_client *mdsc,
                           struct ceph_client *client);
-extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct ceph_message *msg, int mds);
-extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_message *msg);
-extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_message *msg);
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_message *msg);
+extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds);
+extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
+extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
 
 #endif
index f0990c1178689bf23927711fae70c9eacc2e8309..ab38162e7bf5f016497ab0307141fc2fd906b493 100644 (file)
@@ -44,27 +44,25 @@ struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
        return &m->m_addr[w];
 }
 
-int ceph_mdsmap_decode(struct ceph_mdsmap *m, 
-                      struct ceph_bufferlist *bl, 
-                      struct ceph_bufferlist_iterator *bli)
+int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end)
 {
        int i, n;
        __u32 mds;
        int err;
        
-       if ((err = ceph_bl_decode_64(bl, bli, &m->m_epoch)) != 0)
+       if ((err = ceph_decode_64(p, end, &m->m_epoch)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_64(bl, bli, &m->m_client_epoch)) != 0)
+       if ((err = ceph_decode_64(p, end, &m->m_client_epoch)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_sec)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_created.tv_sec)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(bl, bli, &m->m_created.tv_usec)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_created.tv_usec)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(bl, bli, &m->m_anchortable)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_anchortable)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(bl, bli, &m->m_root)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_root)) != 0)
                goto bad;
-       if ((err = ceph_bl_decode_32(bl, bli, &m->m_max_mds)) != 0)
+       if ((err = ceph_decode_32(p, end, &m->m_max_mds)) != 0)
                goto bad;
 
        m->m_addr = kmalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_KERNEL);
@@ -72,28 +70,28 @@ int ceph_mdsmap_decode(struct ceph_mdsmap *m,
        memset(m->m_state, 0, m->m_max_mds);
        
        /* state */
-       if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+       if ((err = ceph_decode_32(p, end, &n)) != 0)
                goto bad;
        for (i=0; i<n; i++) {
-               if ((err = ceph_bl_decode_32(bl, bli, &mds)) != 0)
+               if ((err = ceph_decode_32(p, end, &mds)) != 0)
                        goto bad;
-               if ((err = ceph_bl_decode_32(bl, bli, &m->m_state[mds])) != 0)
+               if ((err = ceph_decode_32(p, end, &m->m_state[mds])) != 0)
                        goto bad;
        }
 
        /* state_seq */
-       if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+       if ((err = ceph_decode_32(p, end, &n)) != 0)
                goto bad;
-       ceph_bl_iterator_advance(bl, bli, n*(sizeof(__u32)+sizeof(__u64)));
+       *p += n*(sizeof(__u32)+sizeof(__u64));
        
        /* mds_inst */
-       if ((err = ceph_bl_decode_32(bl, bli, &n)) != 0)
+       if ((err = ceph_decode_32(p, end, &n)) != 0)
                goto bad;
        for (i=0; i<n; i++) {
-               if ((err = ceph_bl_decode_32(bl, bli, &mds)) != 0)
+               if ((err = ceph_decode_32(p, end, &mds)) != 0)
                        goto bad;
-               ceph_bl_iterator_advance(bl, bli, sizeof(struct ceph_entity_name));
-               if ((err = ceph_bl_decode_addr(bl, bli, &m->m_addr[mds])) != 0)
+               *p += sizeof(struct ceph_entity_name);
+               if ((err = ceph_decode_addr(p, end, &m->m_addr[mds])) != 0)
                        goto bad;
        }
 
index 99396c0558665e8af49faf792aeda621f833b8e1..62dd35bde99a8eb6366c69c4b219657f74281722 100644 (file)
@@ -2,7 +2,6 @@
 #define _FS_CEPH_MDSMAP_H
 
 #include <linux/ceph_fs.h>
-#include "bufferlist.h"
 
 /* see mds/MDSMap.h */
 #define CEPH_MDS_STATE_DNE         0  /* down, never existed. */
@@ -40,6 +39,7 @@ extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
 extern int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w);
 extern struct ceph_entity_addr *ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w);
 
-extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli);
+extern int ceph_mdsmap_decode(struct ceph_mdsmap *m, void **p, void *end);
+
 
 #endif
index e2fd9e3f310b61bc7a250711a1cc662a85f987ac..a922e13cc8b1456290c755076fa78680cdce9e88 100644 (file)
@@ -2,9 +2,9 @@
 #include <linux/socket.h>
 #include <linux/net.h>
 #include <linux/string.h>
-#include <net/tcp.h>
-
+#include <linux/highmem.h>
 #include <linux/ceph_fs.h>
+#include <net/tcp.h>
 #include "messenger.h"
 #include "ktcp.h"
 
@@ -19,6 +19,31 @@ static void try_read(struct work_struct *);
 static void try_write(struct work_struct *);
 static void try_accept(struct work_struct *);
 
+
+
+/*
+ * calculate the number of pages a given length and offset map onto,
+ * if we align the data.
+ */
+static int calc_pages_for(int len, int off)
+{
+       int nr = 0;
+       if (len == 0) 
+               return 0;
+       if (off + len < PAGE_SIZE)
+               return 1;
+       if (off) {
+               nr++;
+               len -= off;
+       }
+       nr += len >> PAGE_SHIFT;
+       if (len & PAGE_MASK)
+               nr++;
+       return nr;
+}
+
+
+
 /*
  * connections
  */
@@ -29,10 +54,9 @@ static void try_accept(struct work_struct *);
 static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
 {
        struct ceph_connection *con;
-       con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+       con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL);
        if (con == NULL) 
                return NULL;
-       memset(con, 0, sizeof(*con));
 
        con->msgr = msgr;
 
@@ -199,61 +223,89 @@ static int do_connect(struct ceph_connection *con)
 
 /*
  * write as much of con->out_partial to the socket as we can.
- *  1 -> done; and cleaned up out_partial
+ *  1 -> done
  *  0 -> socket full, but more to do
  * <0 -> error
  */
-static int write_partial(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con)
 {
-       struct ceph_bufferlist *bl = &con->out_partial;
-       struct ceph_bufferlist_iterator *p = &con->out_pos;
-       int len, ret;
+       int ret;
 
-more:
-       len = bl->b_kv[p->i_kv].iov_len - p->i_off;
-       /* FIXME */
-       /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */
-       if (ret < 0) return ret;
-       if (ret == 0) return 0;   /* socket full */
-       if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) {
-               p->i_kv++;
-               p->i_off = 0;
-               if (p->i_kv == bl->b_kvlen) 
-                       return 1;
-       } else {
-               p->i_off += ret;
+       while (con->out_kvec_bytes > 0) {
+               ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0);
+               if (ret < 0) return ret;  /* error */
+               if (ret == 0) return 0;   /* socket full */
+               con->out_kvec_bytes -= ret;
+               if (con->out_kvec_bytes == 0)
+                       break;            /* done */
+               while (ret > 0) {
+                       if (ret >= con->out_kvec_cur->iov_len) {
+                               ret -= con->out_kvec_cur->iov_len;
+                               con->out_kvec_cur++;
+                       } else {
+                               con->out_kvec_cur->iov_len -= ret;
+                               con->out_kvec_cur->iov_base += ret;
+                               ret = 0;
+                               break;
+                       }
+               }
        }
-       goto more;
+       con->out_kvec_left = 0;
+       return 1;  /* done! */
 }
 
+static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct kvec kv;
+       int ret;
+
+       while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+               kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
+               kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 
+                                (int)(msg->hdr.data_len - con->out_msg_pos.data_pos));
+               ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0);
+               if (ret < 0) return ret;
+               if (ret == 0) return 0;   /* socket full */
+               con->out_msg_pos.data_pos += ret;
+               con->out_msg_pos.page_pos += ret;
+               if (ret == kv.iov_len) {
+                       con->out_msg_pos.page_pos = 0;
+                       con->out_msg_pos.page++;
+               }               
+       }
+
+       /* done */
+       con->out_msg = 0;
+       return 1;
+}
+
+
 /*
  * build out_partial based on the next outgoing message in the queue.
  */
 static void prepare_write_message(struct ceph_connection *con)
 {
-       struct ceph_message *m = list_entry(con->out_queue.next, struct ceph_message, list_head);
-       int i;
+       struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
 
        /* move to sending/sent list */
        list_del(&m->list_head);
        list_add(&m->list_head, &con->out_sent);
-       
-       ceph_bl_init(&con->out_partial);  
-       ceph_bl_iterator_init(&con->out_pos);
-
-       /* always one chunk, for now */
-       m->hdr.nchunks = 1;  
-       m->chunklens[0] = m->payload.b_len;
-
-       /* tag + header */
-       ceph_bl_append_ref(&con->out_partial, &tag_msg, 1);
-       ceph_bl_append_ref(&con->out_partial, &m->hdr, sizeof(m->hdr));
-       
-       /* payload */
-       ceph_bl_append_ref(&con->out_partial, &m->chunklens[0], sizeof(__u32));
-       for (i=0; i<m->payload.b_kvlen; i++) 
-               ceph_bl_append_ref(&con->out_partial, m->payload.b_kv[i].iov_base, 
-                                  m->payload.b_kv[i].iov_len);
+       con->out_msg = m;
+
+       /* tag + hdr + front */
+       con->out_kvec[0].iov_base = &tag_msg;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec[1].iov_base = &m->hdr;
+       con->out_kvec[1].iov_len = sizeof(m->hdr);
+       con->out_kvec[2] = m->front;
+       con->out_kvec_left = 3;
+       con->out_kvec_bytes = 1 + sizeof(m->hdr) + m->front.iov_len;
+       con->out_kvec_cur = con->out_kvec;
+
+       /* pages */
+       con->out_msg_pos.page = 0;
+       con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK;
+       con->out_msg_pos.data_pos = 0;
 }
 
 /* 
@@ -262,32 +314,42 @@ static void prepare_write_message(struct ceph_connection *con)
 static void prepare_write_ack(struct ceph_connection *con)
 {
        con->in_seq_acked = con->in_seq;
-       
-       ceph_bl_init(&con->out_partial);  
-       ceph_bl_iterator_init(&con->out_pos);
-       ceph_bl_append_ref(&con->out_partial, &tag_ack, 1);
-       ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+
+       con->out_kvec[0].iov_base = &tag_ack;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec[1].iov_base = &con->in_seq_acked;
+       con->out_kvec[1].iov_len = sizeof(con->in_seq_acked);
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked);
+       con->out_kvec_cur = con->out_kvec;
 }
 
 static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
 {
-       ceph_bl_init(&con->out_partial);  
-       ceph_bl_iterator_init(&con->out_pos);
-       ceph_bl_append_ref(&con->out_partial, &msgr->addr, sizeof(msgr->addr));
+       con->out_kvec[0].iov_base = &msgr->inst.addr;
+       con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
+       con->out_kvec_left = 1;
+       con->out_kvec_bytes = sizeof(msgr->inst.addr);
+       con->out_kvec_cur = con->out_kvec;
 }
 
 static void prepare_write_accept_ready(struct ceph_connection *con)
 {
-       ceph_bl_init(&con->out_partial);  
-       ceph_bl_iterator_init(&con->out_pos);
-       ceph_bl_append_ref(&con->out_partial, &tag_ready, 1);
+       con->out_kvec[0].iov_base = &tag_ready;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec_left = 1;
+       con->out_kvec_bytes = 1;
+       con->out_kvec_cur = con->out_kvec;
 }
 static void prepare_write_accept_reject(struct ceph_connection *con)
 {
-       ceph_bl_init(&con->out_partial);  
-       ceph_bl_iterator_init(&con->out_pos);
-       ceph_bl_append_ref(&con->out_partial, &tag_reject, 1);
-       ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq));
+       con->out_kvec[0].iov_base = &tag_reject;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec[1].iov_base = &con->connect_seq;
+       con->out_kvec[1].iov_len = sizeof(con->connect_seq);
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = 1 + sizeof(con->connect_seq);
+       con->out_kvec_cur = con->out_kvec;
 }
 
 /*
@@ -295,24 +357,20 @@ static void prepare_write_accept_reject(struct ceph_connection *con)
  */
 static void try_write(struct work_struct *work)
 {
-       int ret;
        struct ceph_connection *con;
        struct ceph_messenger *msgr;
+       int ret;
 
        con = container_of(work, struct ceph_connection, swork);
        msgr = con->msgr;
 
 more:
-       /* data queued? */
-       if (con->out_partial.b_kvlen) {
-               ret = write_partial(con);
-               if (ret == 0) goto done;
-
-               /* error or success */
-               /* clean up */
-               ceph_bl_init(&con->out_partial);  
-               ceph_bl_iterator_init(&con->out_pos);
-
+       /* kvec data queued? */
+       if (con->out_kvec_left) {
+               ret = write_partial_kvec(con);
+               if (ret == 0) 
+                       goto done;
+               
                if (con->state == REJECTING) {
                        /* FIXME do something else here, pbly? */
                        remove_connection(msgr, con);
@@ -321,7 +379,15 @@ more:
                }
                
                /* TBD: handle error; return for now */
-               if (ret < 0) goto done; /* error */
+               if (ret < 0) 
+                       goto done; /* error */
+       }
+
+       /* msg pages? */
+       if (con->out_msg) {
+               ret = write_partial_msg_pages(con, con->out_msg);
+               if (ret == 0) 
+                       goto done;
        }
        
        /* anything else pending? */
@@ -340,50 +406,96 @@ done:
 }
 
 
+/* 
+ * prepare to read a message
+ */
+static int prepare_read_message(struct ceph_connection *con)
+{
+       con->in_tag = CEPH_MSGR_TAG_MSG;
+       con->in_base_pos = 0;
+       con->in_msg = kzalloc(sizeof(*con->in_msg), GFP_KERNEL);
+       if (con->in_msg == NULL) {
+               /* TBD: we don't check for error in caller, handle error */
+               derr(1, "kmalloc failure on incoming message\n");
+               return -ENOMEM;
+       }
+       
+       ceph_msg_get(con->in_msg);
+       return 0;
+}
+
 /*
  * read (part of) a message
  */
 static int read_message_partial(struct ceph_connection *con)
 {
-       struct ceph_message *m = con->in_partial;
-       int ret, s, chunkbytes, c, did;
-       size_t left;
+       struct ceph_msg *m = con->in_msg;
+       void *p;
+       int ret;
+       int want, left;
 
-       while (con->in_base_pos < sizeof(struct ceph_message_header)) {
-               left = sizeof(struct ceph_message_header) - con->in_base_pos;
+       /* header */
+       while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
+               left = sizeof(struct ceph_msg_header) - con->in_base_pos;
                ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
-       if (m->hdr.nchunks == 0) return 1; /* done */
 
-       chunkbytes = sizeof(__u32)*m->hdr.nchunks;
-       while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) {
-               int off = con->in_base_pos - sizeof(struct ceph_message_header);
-               left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos;
-               ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0);
+       /* front */
+       if (m->front.iov_len < m->hdr.front_len) {
+               if (m->front.iov_base == NULL) {
+                       m->front.iov_base = kmalloc(m->hdr.front_len, GFP_KERNEL);
+                       if (m->front.iov_base == NULL)
+                               return -ENOMEM;
+               }
+               left = m->hdr.front_len - m->front.iov_len;
+               ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0);
                if (ret <= 0) return ret;
-               con->in_base_pos += ret;
+               m->front.iov_len += ret;
        }
-       
-       did = 0;
-       for (c = 0; c<m->hdr.nchunks; c++) {
-       more:
-               left = did + m->chunklens[c] - m->payload.b_len;
-               if (left <= 0) {
-                       did += m->chunklens[c];
-                       continue;
-               }
-               ceph_bl_prepare_append(&m->payload, left);
-               s = min(m->payload.b_append.iov_len, left);
-               ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0);
+
+       /* (page) data */
+       if (m->hdr.data_len == 0) 
+               goto done;
+       if (m->nr_pages == 0) {
+               want = calc_pages_for(m->hdr.data_len, m->hdr.data_off);
+               m->pages = kmalloc(want * sizeof(*m->pages), GFP_KERNEL);
+               if (m->pages == NULL)
+                       return -ENOMEM;
+               m->nr_pages = want;
+               con->in_msg_pos.page = 0;
+               con->in_msg_pos.page_pos = m->hdr.data_off;
+               con->in_msg_pos.data_pos = 0;
+       }
+       while (con->in_msg_pos.data_pos < m->hdr.data_len) {
+               left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos),
+                          (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+               p = kmap(m->pages[con->in_msg_pos.page]);
+               ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0);
                if (ret <= 0) return ret;
-               ceph_bl_append_copied(&m->payload, s);
-               goto more;
+               con->in_msg_pos.data_pos += ret;
+               con->in_msg_pos.page_pos += ret;
+               if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+                       con->in_msg_pos.page_pos = 0;
+                       con->in_msg_pos.page++;
+               }
        }
+
+done:
        return 1; /* done! */
 }
 
+
+/* 
+ * prepare to read an ack
+ */
+static void prepare_read_ack(struct ceph_connection *con)
+{
+       con->in_tag = CEPH_MSGR_TAG_ACK;
+       con->in_base_pos = 0;
+}
+
 /*
  * read (part of) an ack
  */
@@ -398,7 +510,22 @@ static int read_ack_partial(struct ceph_connection *con)
        return 1; /* done */
 }
 
+static void process_ack(struct ceph_connection *con, __u32 ack)
+{
+       struct ceph_msg *m;
+       while (!list_empty(&con->out_sent)) {
+               m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
+               if (m->hdr.seq > ack) break;
+               dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m);
+               list_del(&m->list_head);
+               ceph_msg_put(m);
+       }
+}
+
 
+/*
+ * read portion of handshake on a newly accepted connection
+ */
 static int read_accept_partial(struct ceph_connection *con)
 {
        int ret;
@@ -422,48 +549,6 @@ static int read_accept_partial(struct ceph_connection *con)
        return 1; /* done */
 }
 
-/* 
- * prepare to read a message
- */
-static void prepare_read_message(struct ceph_connection *con)
-{
-       con->in_tag = CEPH_MSGR_TAG_MSG;
-       con->in_base_pos = 0;
-       con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
-       if (con->in_partial == NULL) {
-               /* TBD: we don't check for error in caller, handle error */
-               printk(KERN_INFO "malloc failure\n");
-               goto done;
-       }
-
-       ceph_get_msg(con->in_partial);
-       ceph_bl_init(&con->in_partial->payload);
-       ceph_bl_iterator_init(&con->in_pos);
-done:
-       return;
-}
-
-/* 
- * prepare to read an ack
- */
-static void prepare_read_ack(struct ceph_connection *con)
-{
-       con->in_tag = CEPH_MSGR_TAG_ACK;
-       con->in_base_pos = 0;
-}
-
-static void process_ack(struct ceph_connection *con, __u32 ack)
-{
-       struct ceph_message *m;
-       while (!list_empty(&con->out_sent)) {
-               m = list_entry(con->out_sent.next, struct ceph_message, list_head);
-               if (m->hdr.seq > ack) break;
-               dout(5, "got ack for %d type %d at %p\n", m->hdr.seq, m->hdr.type, m);
-               list_del(&m->list_head);
-               ceph_put_msg(m);
-       }
-}
-
 /*
  * call after a new connection's handshake has completed
  */
@@ -476,7 +561,7 @@ static void process_accept(struct ceph_connection *con)
        existing = get_connection(con->msgr, &con->peer_addr);
        if (existing) {
                spin_lock(&existing->con_lock);
-               if ((existing->state == CONNECTING && compare_addr(&con->msgr->addr, &con->peer_addr)) ||
+               if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
                    (existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
                        /* replace existing with new connection */
                        replace_connection(con->msgr, existing, con);
@@ -556,9 +641,9 @@ more:
                /* TBD: do something with error */
                if (ret <= 0) goto done;
                /* got a full message! */
-               msgr->dispatch(con->msgr->parent, con->in_partial);
-               ceph_put_msg(con->in_partial);
-               con->in_partial = 0;
+               msgr->dispatch(con->msgr->parent, con->in_msg);
+               ceph_msg_put(con->in_msg);
+               con->in_msg = 0;
                con->in_tag = CEPH_MSGR_TAG_READY;
                goto more;
        }
@@ -593,20 +678,19 @@ static void try_accept(struct work_struct *work)
        msgr = container_of(work, struct ceph_messenger, awork);
        sock = msgr->listen_sock;
 
-
-        printk(KERN_INFO "Entered try_accept\n");
+        dout(5, "Entered try_accept\n");
 
 
         if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) {
-               printk(KERN_INFO "error accepting connection \n");
+               derr(1, "error accepting connection\n");
                 goto done;
         }
-        printk(KERN_INFO "accepted connection \n");
+       dout(5, "accepted connection \n");
 
         /* get the address at the other end */
         memset(&saddr, 0, sizeof(saddr));
         if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) {
-                printk(KERN_INFO "getname error connection aborted\n");
+                derr(1, "getname error connection aborted\n");
                 sock_release(new_sock);
                 goto done;
         }
@@ -614,7 +698,7 @@ static void try_accept(struct work_struct *work)
        /* initialize the msgr connection */
        new_con = new_connection(msgr);
        if (new_con == NULL) {
-                       printk(KERN_INFO "malloc failure\n");
+                       derr(1, "malloc failure\n");
                sock_release(new_sock);
                goto done;
                }
@@ -642,10 +726,9 @@ struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr)
 {
         struct ceph_messenger *msgr;
 
-        msgr = kmalloc(sizeof(*msgr), GFP_KERNEL);
+        msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
         if (msgr == NULL) 
                return NULL;
-        memset(msgr, 0, sizeof(*msgr));
 
        spin_lock_init(&msgr->con_lock);
 
@@ -663,11 +746,16 @@ struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr)
 
 /*
  * queue up an outgoing message
+ *
+ * will take+drop msgr, then connection locks.
  */
-int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg)
+int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
 {
        struct ceph_connection *con;
        
+       /* set source */
+       msg->hdr.src = msgr->inst;
+
        /* do we have the connection? */
        spin_lock(&msgr->con_lock);
        con = get_connection(msgr, &msg->hdr.dst.addr);
@@ -676,13 +764,15 @@ int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg)
                if (IS_ERR(con))
                        return PTR_ERR(con);
                dout(5, "opening new connection to peer %x:%d\n",
-                    ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), msg->hdr.dst.addr.ipaddr.sin_port);
+                    ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), 
+                    msg->hdr.dst.addr.ipaddr.sin_port);
                con->peer_addr = msg->hdr.dst.addr;
                con->state = CONNECTING;
                add_connection(msgr, con);
        } else {
                dout(5, "had connection to peer %x:%d\n",
-                    msg->hdr.dst.addr.ipaddr.sin_addr.s_addr, msg->hdr.dst.addr.ipaddr.sin_port);
+                    msg->hdr.dst.addr.ipaddr.sin_addr.s_addr,
+                    msg->hdr.dst.addr.ipaddr.sin_port);
        }                    
        spin_unlock(&msgr->con_lock);
 
@@ -695,43 +785,68 @@ int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg)
        /* queue */
        dout(1, "queuing outgoing message for %s.%d\n",
             ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num);
-       ceph_get_msg(msg);
+       ceph_msg_get(msg);
        list_add(&con->out_queue, &msg->list_head);
-
+       
        spin_unlock(&con->con_lock);
        put_connection(con);
+       return 0;
 }
 
 
 
-struct ceph_message *ceph_new_message(int type, int size)
+struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off)
 {
-       struct ceph_message *m;
+       struct ceph_msg *m;
+       int i;
 
-       m = kmalloc(sizeof(*m), GFP_KERNEL);
+       m = kzalloc(sizeof(*m), GFP_KERNEL);
        if (m == NULL)
-               return ERR_PTR(-ENOMEM);
-       memset(m, 0, sizeof(*m));
+               goto out;
+       atomic_set(&m->nref, 1);
        m->hdr.type = type;
-       
-       if (size) {
-               BUG_ON(size);  /* implement me */
+       m->hdr.front_len = front_len;
+       m->hdr.data_len = page_len;
+       m->hdr.data_off = page_off;
+
+       /* front */
+       m->front.iov_base = kmalloc(front_len, GFP_KERNEL);
+       if (m->front.iov_base == NULL)
+               goto out2;
+       m->front.iov_len = front_len;
+
+       /* pages */
+       m->nr_pages = calc_pages_for(page_len, page_off);
+       if (m->nr_pages) {
+               m->pages = kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL);
+               for (i=0; i<m->nr_pages; i++) {
+                       m->pages[i] = alloc_page(GFP_KERNEL);
+                       if (m->pages[i] == NULL)
+                               goto out2;
+               }
        }
-
        return m;
-}
-
 
+out2:
+       ceph_msg_put(m);
+out:
+       return ERR_PTR(-ENOMEM);
+}
 
-int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v)
+void ceph_msg_put(struct ceph_msg *m)
 {
-       int err;
-       if (!ceph_bl_decode_have(bl, bli, sizeof(*v)))
-               return -EINVAL;
-       if ((err = ceph_bl_decode_32(bl, bli, &v->erank)) != 0)
-               return -EINVAL;
-       if ((err = ceph_bl_decode_32(bl, bli, &v->nonce)) != 0)
-               return -EINVAL;
-       ceph_bl_copy(bl, bli, &v->ipaddr, sizeof(v->ipaddr));
-       return 0;
+       if (atomic_dec_and_test(&m->nref)) {
+               int i;
+               if (m->pages) {
+                       for (i=0; i<m->nr_pages; i++)
+                               if (m->pages[i])
+                                       kfree(m->pages[i]);
+                       kfree(m->pages);
+               }
+               if (m->front.iov_base)
+                       kfree(m->front.iov_base);
+               kfree(m);
+       }
 }
+
+
index 9ea1e07a7605b1d39a8f994197133cb56d677090..d2acc4135feb05fecadfb8438c4c7dc5995750ec 100644 (file)
@@ -6,13 +6,12 @@
 #include <linux/radix-tree.h>
 #include <linux/workqueue.h>
 #include <linux/ceph_fs.h>
-#include "bufferlist.h"
 
-struct ceph_message;
+struct ceph_msg;
 
-typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_message *m);
+typedef void (*ceph_messenger_dispatch_t) (void *p, struct ceph_msg *m);
 
-__inline__ const char *ceph_name_type_str(int t) {
+static __inline__ const char *ceph_name_type_str(int t) {
        switch (t) {
        case CEPH_ENTITY_TYPE_MON: return "mon";
        case CEPH_ENTITY_TYPE_MDS: return "mds";
@@ -29,22 +28,29 @@ struct ceph_messenger {
        ceph_messenger_dispatch_t dispatch;
        struct socket *listen_sock;      /* listening socket */
        struct work_struct awork;        /* accept work */
-       struct ceph_entity_addr addr;    /* my address */
+       struct ceph_entity_inst inst;    /* my name+address */
        spinlock_t con_lock;
        struct list_head con_all;        /* all connections */
        struct list_head con_accepting;  /*  doing handshake, or */
        struct radix_tree_root con_open; /*  established. see get_connection() */
 };
 
-struct ceph_message {
-       struct ceph_message_header hdr; /* header */
-       __u32 chunklens[2];
-       struct ceph_bufferlist payload;
+struct ceph_msg {
+       struct ceph_msg_header hdr;     /* header */
+       struct kvec front;              /* first bit of message */
+       struct page **pages;            /* data payload */
+       unsigned nr_pages;              /* size of page array */
 
        struct list_head list_head;
        atomic_t nref;
 };
 
+struct ceph_msg_pos {
+       int page, page_pos;        /* which page; -3=tag, -2=hdr, -1=front */
+       int data_pos;
+};
+
+
 /* current state of connection */
 enum ceph_connection_state {
        NEW = 1,
@@ -75,16 +81,23 @@ struct ceph_connection {
 
        /* out queue */
        struct list_head out_queue;
-       struct ceph_bufferlist out_partial;  /* refereces existing bufferlists; do not free() */
-       struct ceph_bufferlist_iterator out_pos;
+
+       struct kvec out_kvec[4],
+               *out_kvec_cur;
+       int out_kvec_left;   /* kvec's left */
+       int out_kvec_bytes;  /* bytes left */
+
+       struct ceph_msg *out_msg;
+       struct ceph_msg_pos out_msg_pos;
+
        struct list_head out_sent;   /* sending/sent but unacked; resend if connection drops */
 
        /* partially read message contents */
        char in_tag;       /* READY (accepting, or no in-progress read) or ACK or MSG */
        int in_base_pos;   /* for ack seq, or msg headers, or accept handshake */
        __u32 in_partial_ack;  
-       struct ceph_message *in_partial;
-       struct ceph_bufferlist_iterator in_pos;  /* for msg payload */
+       struct ceph_msg *in_msg;
+       struct ceph_msg_pos in_msg_pos;
 
        struct work_struct rwork;               /* received work */
        struct work_struct swork;               /* send work */
@@ -93,25 +106,55 @@ struct ceph_connection {
 };
 
 
-/* messenger */
-extern int ceph_messenger_send(struct ceph_messenger *msgr, struct ceph_message *msg);
-
+extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off);
+static __inline__ void ceph_msg_get(struct ceph_msg *msg) {
+       atomic_inc(&msg->nref);
+}
+extern void ceph_msg_put(struct ceph_msg *msg);
+extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg);
 
-/* messages */
-extern struct ceph_message *ceph_new_message(int type, int size);
 
-static __inline__ void ceph_put_msg(struct ceph_message *msg) {
-       if (atomic_dec_and_test(&msg->nref)) {
-               ceph_bl_clear(&msg->payload);
-               kfree(msg);
-       }
+static __inline__ int ceph_decode_64(void **p, void *end, __u64 *v) {
+       if (*p + sizeof(v) > end)
+               return -EINVAL;
+       *v = le64_to_cpu(*(__u64*)p);
+       p += sizeof(*v);
+       return 0;
+}
+static __inline__ int ceph_decode_32(void **p, void *end, __u32 *v) {
+       if (*p + sizeof(v) > end)
+               return -EINVAL;
+       *v = le32_to_cpu(*(__u32*)p);
+       p += sizeof(*v);
+       return 0;
+}
+static __inline__ int ceph_decode_16(void **p, void *end, __u16 *v) {
+       if (*p + sizeof(v) > end)
+               return -EINVAL;
+       *v = le16_to_cpu(*(__u16*)p);
+       p += sizeof(*v);
+       return 0;
+}
+static __inline__ int ceph_decode_copy(void **p, void *end, void *v, int len) {
+       if (*p + len > end) 
+               return -EINVAL;
+       memcpy(v, *p, len);
+       *p += len;
+       return 0;
 }
 
-static __inline__ void ceph_get_msg(struct ceph_message *msg) {
-       atomic_inc(&msg->nref);
+static __inline__ int ceph_decode_addr(void **p, void *end, struct ceph_entity_addr *v) {
+       int err;
+       if (*p + sizeof(*v) > end) 
+               return -EINVAL;
+       if ((err = ceph_decode_32(p, end, &v->erank)) != 0)
+               return -EINVAL;
+       if ((err = ceph_decode_32(p, end, &v->nonce)) != 0)
+               return -EINVAL;
+       ceph_decode_copy(p, end, &v->ipaddr, sizeof(v->ipaddr));
+       return 0;
 }
 
 
-extern int ceph_bl_decode_addr(struct ceph_bufferlist *bl, struct ceph_bufferlist_iterator *bli, struct ceph_entity_addr *v);
 
 #endif
index 1e1147e0bfda6faf074ede2943d3bdad9c1c82fc..d44b9c5e687ec250442aabb7acf2292514ae62d5 100644 (file)
@@ -12,7 +12,7 @@ struct ceph_mon_client {
 };
 
 extern void ceph_monc_init(struct ceph_mon_client *monc);
-extern void ceph_monc_handle_monmap(struct ceph_mon_client *monc, struct ceph_message *m);
+extern void ceph_monc_handle_monmap(struct ceph_mon_client *monc, struct ceph_msg *m);
 
 extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, __u64 have);
 extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, __u64 have);
index e76a45977b7c95b37f2ee8fa18cc6cd3f68c86d0..5e623b10352d2cc7ad9f2066864d86454ba18599 100644 (file)
@@ -2,7 +2,6 @@
 #define _FS_CEPH_MONMAP_H
 
 #include <linux/ceph_fs.h>
-#include "bufferlist.h"
 
 /*
  * monitor map
@@ -13,6 +12,6 @@ struct ceph_monmap {
   struct ceph_entity_inst *mon_inst;
 };
 
-extern int ceph_monmap_decode(struct ceph_monmap *m, struct ceph_bufferlist *bl);
+extern int ceph_monmap_decode(struct ceph_monmap *m, void **p, void *end);
 
 #endif
index ef99ba2c0a39cdf56ce194843ed35e4ff9f1cb42..6f643646b9427ce7cf1bc8ae7832764c95c7dd5d 100644 (file)
@@ -15,8 +15,8 @@ struct ceph_osd_client {
 };
 
 extern void ceph_osdc_init(struct ceph_osd_client *osdc);
-extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_message *msg);
-extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_message *msg);
+extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg);
+extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg);
 
 #endif
 
index 9a213c4090bd9dd03c337cdc95aba9a77473a168..7eba370f833f5e07cfa4517bb0ef65780139961d 100644 (file)
@@ -135,10 +135,9 @@ static int ceph_set_super(struct super_block *s, void *data)
 
        s->s_flags = args->mntflags;
        
-       sbinfo = kmalloc(sizeof(struct ceph_super_info), GFP_KERNEL);
+       sbinfo = kzalloc(sizeof(struct ceph_super_info), GFP_KERNEL);
        if (!sbinfo)
                return -ENOMEM;
-       memset(sbinfo, 0, sizeof(*sbinfo));
        s->s_fs_info = sbinfo;
 
        memcpy(&sbinfo->mount_args, args, sizeof(*args));
@@ -247,7 +246,7 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru
                token = match_token(c, arg_tokens, argstr);
                ret = match_int(&argstr[0], &intval);
                if (ret < 0) {
-                       dout(0, "bad mount arg\n");
+                       dout(0, "bad mount arg, not int\n");
                        continue;
                }
                switch (token) {
@@ -264,6 +263,7 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru
                        ceph_debug = intval;
                        break;
                default:
+                       derr(1, "bad mount option %s\n", c);
                        continue;
                }
        }