]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kernel messenger sort of working!
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 29 Nov 2007 00:27:19 +0000 (00:27 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 29 Nov 2007 00:27:19 +0000 (00:27 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2139 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/include/ceph_fs.h
trunk/ceph/kernel/client.c
trunk/ceph/kernel/ktcp.c
trunk/ceph/kernel/mds_client.c
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/messenger.h
trunk/ceph/kernel/osd_client.c
trunk/ceph/kernel/super.c
trunk/ceph/kernel/super.h

index a60ed84099d31bb16bc7fa0678e7cb2b877da31d..987996a623ee96984e2f3864c7bf2665e67abbb7 100644 (file)
@@ -189,9 +189,9 @@ struct ceph_entity_addr {
        struct sockaddr_in ipaddr;
 };
 
-#define ceph_entity_addr_is_local(a,b)         \
-       ((a).nonce == (b).nonce &&              \
-        (a).ipaddr == (b).ipaddr)
+#define ceph_entity_addr_is_local(a,b)                                 \
+       ((a).nonce == (b).nonce &&                                      \
+        (a).ipaddr.sin_addr.s_addr == (b).ipaddr.sin_addr.s_addr)
 
 #define compare_addr(a, b)                     \
        ((a)->erank == (b)->erank &&            \
index 2c1167845841171c6634e01b1db575647bc26419..93b07fcb4b5d6eae2f06041e6fe29177be6349e2 100644 (file)
@@ -9,7 +9,7 @@
 
 
 /* debug level; defined in include/ceph_fs.h */
-int ceph_debug = 20;
+int ceph_debug = 200;
 
 /*
  * directory of filesystems mounted by this host
@@ -66,7 +66,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args)
        get_client_counter();
 
        /* messenger */
-       cl->msgr = ceph_messenger_create();
+       cl->msgr = ceph_messenger_create(&args->my_addr);
        if (IS_ERR(cl->msgr)) {
                err = PTR_ERR(cl->msgr);
                goto fail;
@@ -236,7 +236,9 @@ void ceph_put_client(struct ceph_client *cl)
  */
 void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
 {
-       dout(5, "dispatch %p type %d\n", (void*)msg, msg->hdr.type);
+       dout(5, "dispatch from %s%d type %d len %d+%d\n",
+            ceph_name_type_str(msg->hdr.src.name.type), msg->hdr.src.name.num,
+            msg->hdr.type, msg->hdr.front_len, msg->hdr.data_len);
 
        /* deliver the message */
        switch (msg->hdr.type) {
index 4dd6c3235ae0f402e773cddd6d1f2390f4a7a5db..c5c9189eb62fc88c661a6ac1496cf63875281efb 100644 (file)
@@ -17,13 +17,14 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
         struct ceph_connection *con;
         struct ceph_messenger *msgr;
 
-        printk(KERN_INFO "Entered ceph_data_ready \n");
-
        if (sk->sk_state == TCP_LISTEN) {
                msgr = (struct ceph_messenger *)sk->sk_user_data;
+               dout(30, "ceph_data_ready listener %p\n", msgr);
                queue_work(recv_wq, &msgr->awork);
        } else {
                con = (struct ceph_connection *)sk->sk_user_data;
+               dout(30, "ceph_data_ready connection %p state = %u, queuing rwork\n",
+                    con, con->state);
                queue_work(recv_wq, &con->rwork);
        }
 }
@@ -33,9 +34,9 @@ static void ceph_write_space(struct sock *sk)
 {
         struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
 
-        printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
-        if (test_bit(WRITE_PEND, &con->state)) {
-                printk(KERN_INFO "WRITE_PEND set in connection\n");
+        dout(30, "ceph_write_space %p state = %u\n", con, con->state);
+        if (test_bit(WRITE_PENDING, &con->state)) {
+                dout(30, "ceph_write_space %p queueing write work\n", con);
                 queue_work(send_wq, &con->swork);
         }
 }
@@ -44,13 +45,14 @@ static void ceph_write_space(struct sock *sk)
 static void ceph_state_change(struct sock *sk)
 {
         struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-        printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
 
+        dout(30, "ceph_state_change %p state = %u\n", con, con->state);
         if (sk->sk_state == TCP_ESTABLISHED) {
-                if (test_and_clear_bit(CONNECTING, &con->state) ||
-                   test_bit(ACCEPTING, &con->state))
-                        set_bit(OPEN, &con->state);
-                ceph_write_space(sk);
+               /*if (test_bit(CONNECTING, &con->state) ||
+                 test_bit(ACCEPTING, &con->state)) {*/
+                       dout(30, "ceph_state_change %p socket established, queuing swork\n", con);
+                       queue_work(send_wq, &con->swork);
+                       /*}*/
         }
 }
 
@@ -76,20 +78,18 @@ int ceph_tcp_connect(struct ceph_connection *con)
        int ret;
        struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
 
-        set_bit(CONNECTING, &con->state);
-
         ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
         if (ret < 0) {
                 derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret);
                 goto done;
         }
 
-        /* setup callbacks */
         set_sock_callbacks(con->sock, (void *)con);
-
-        ret = con->sock->ops->connect(con->sock, paddr,
+        
+       ret = con->sock->ops->connect(con->sock, paddr,
                                       sizeof(struct sockaddr_in), O_NONBLOCK);
-        if (ret == -EINPROGRESS) return 0;
+        if (ret == -EINPROGRESS) 
+               return 0;
         if (ret < 0) {
                 /* TBD check for fatal errors, retry if not fatal.. */
                 derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret);
@@ -144,9 +144,7 @@ int ceph_tcp_listen(struct ceph_messenger *msgr)
                derr(0, "failed to getsockname: %d\n", ret);
                goto err;
        }
-       dout(0, "ceph_tcp_listen on %x:%d\n",
-            ntohl(myaddr->sin_addr.s_addr),
-            ntohs(myaddr->sin_port));
+       dout(0, "ceph_tcp_listen on port %d\n", ntohs(myaddr->sin_port));
 
        ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
                                (char *)&optval, sizeof(optval)); 
@@ -208,7 +206,6 @@ int ceph_tcp_accept(struct socket *sock, struct ceph_connection *con)
                goto err;
        }
 
-        set_bit(ACCEPTING, &con->state);
 done:
        return ret;
 err:
@@ -225,18 +222,12 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
        struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;           /* length read */
 
-       printk(KERN_INFO "entered krevmsg\n");
+       dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len);
        msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-
        /* receive one kvec for now...  */
        rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
-        if (rlen < 0) {
-               printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
-        }
-       /* TBD: kernel_recvmsg doesn't fill in the name and namelen
-         */
+       dout(30, "ceph_tcp_recvmsg %p len %d ret = %d\n", sock, (int)len, rlen);
        return(rlen);
-
 }
 
 /*
@@ -247,13 +238,10 @@ int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t
        struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;
 
-       printk(KERN_INFO "entered ksendmsg\n");
+       dout(30, "ceph_tcp_sendmsg %p len %d\n", sock, (int)len);
        msg.msg_flags |=  MSG_DONTWAIT | MSG_NOSIGNAL;
-
        rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-        if (rlen < 0) {
-               printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
-        }
+       dout(30, "ceph_tcp_sendmsg %p len %d ret = %d\n", sock, (int)len, rlen);
        return(rlen);
 }
 
index 4b8c0c4cd66af1e0e84de80acfaf1cc6593d3a6a..7cb981d62dbb28e468aee5593109cde557f54cff 100644 (file)
@@ -221,12 +221,14 @@ static void wait_for_new_map(struct ceph_mds_client *mdsc)
 
 void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
 {
+       spin_lock_init(&mdsc->lock);
        mdsc->client = client;
        mdsc->mdsmap = 0;  /* none yet */
        mdsc->sessions = 0;
        mdsc->max_sessions = 0;
        mdsc->last_tid = 0;
        INIT_RADIX_TREE(&mdsc->request_tree, GFP_KERNEL);
+       mdsc->last_requested_map = 0;
        init_completion(&mdsc->map_waiters);
 }
 
index 6ef6d4a4523abad198b8da0ab9b1800bbbe76500..d7cfe5f3fde57538c5bea9f0c07b9cd1543d3d13 100644 (file)
@@ -64,7 +64,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
 
-       spin_lock_init(&con->con_lock);
+       spin_lock_init(&con->lock);
        set_bit(NEW, &con->state);
        INIT_WORK(&con->rwork, try_read);       /* setup work structure */
        INIT_WORK(&con->swork, try_write);      /* setup work structure */
@@ -178,7 +178,7 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect
 
        dout(20, "__remove_connection %p from %p\n", con, msgr);
        list_del(&con->list_all);
-       if (test_bit(CONNECTING, &con->state) || 
+       if (test_bit(CONNECTING, &con->state) ||
            test_bit(OPEN, &con->state)) {
                /* remove from con_open too */
                key = hash_addr(&con->peer_addr);
@@ -231,10 +231,11 @@ static int write_partial_kvec(struct ceph_connection *con)
 {
        int ret;
 
+       dout(30, "write_partial_kvec %p left %d vec %d bytes\n", con, 
+            con->out_kvec_left, con->out_kvec_bytes);
        while (con->out_kvec_bytes > 0) {
                ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes);
-               if (ret < 0) return ret;  /* error */
-               if (ret == 0) return 0;   /* socket full */
+               if (ret <= 0) goto out;
                con->out_kvec_bytes -= ret;
                if (con->out_kvec_bytes == 0)
                        break;            /* done */
@@ -251,7 +252,11 @@ static int write_partial_kvec(struct ceph_connection *con)
                }
        }
        con->out_kvec_left = 0;
-       return 1;  /* done! */
+       ret = 1;
+out:
+       dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, 
+            con->out_kvec_left, con->out_kvec_bytes, ret);
+       return ret;  /* done! */
 }
 
 static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
@@ -289,12 +294,15 @@ static void prepare_write_message(struct ceph_connection *con)
 
        /* move to sending/sent list */
        list_del(&m->list_head);
-       list_add(&m->list_head, &con->out_sent);
+       list_add_tail(&m->list_head, &con->out_sent);
        con->out_msg = m;
 
        /* encode header */
        ceph_encode_header(&con->out_hdr, &m->hdr);
 
+       dout(20, "prepare_write_message %p seq %d type %d len %d+%d\n", 
+            m, m->hdr.seq, m->hdr.type, m->hdr.front_len, m->hdr.data_len);
+
        /* tag + hdr + front */
        con->out_kvec[0].iov_base = &tag_msg;
        con->out_kvec[0].iov_len = 1;
@@ -309,6 +317,8 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg_pos.page = 0;
        con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK;
        con->out_msg_pos.data_pos = 0;
+
+       set_bit(WRITE_PENDING, &con->state);
 }
 
 /* 
@@ -325,6 +335,19 @@ static void prepare_write_ack(struct ceph_connection *con)
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked);
        con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
+}
+
+static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con)
+{
+       con->out_kvec[0].iov_base = &msgr->inst.addr;
+       con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
+       con->out_kvec[1].iov_base = &con->connect_seq;
+       con->out_kvec[1].iov_len = sizeof(con->connect_seq);
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = sizeof(msgr->inst.addr) + sizeof(con->connect_seq);
+       con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
 }
 
 static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
@@ -334,6 +357,7 @@ static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ce
        con->out_kvec_left = 1;
        con->out_kvec_bytes = sizeof(msgr->inst.addr);
        con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
 }
 
 static void prepare_write_accept_ready(struct ceph_connection *con)
@@ -343,7 +367,9 @@ static void prepare_write_accept_ready(struct ceph_connection *con)
        con->out_kvec_left = 1;
        con->out_kvec_bytes = 1;
        con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
 }
+
 static void prepare_write_accept_reject(struct ceph_connection *con)
 {
        con->out_kvec[0].iov_base = &tag_reject;
@@ -353,6 +379,7 @@ static void prepare_write_accept_reject(struct ceph_connection *con)
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + sizeof(con->connect_seq);
        con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
 }
 
 /*
@@ -371,18 +398,15 @@ more:
        /* kvec data queued? */
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
-               if (ret == 0) 
-                       goto done;
-               
-               if (test_bit(REJECTING, &con->state)) {
+               if (test_and_clear_bit(REJECTING, &con->state)) {
+                       dout(30, "try_write done rejecting, state %u, closing\n", con->state);
                        /* FIXME do something else here, pbly? */
                        remove_connection(msgr, con);
                        set_bit(CLOSED, &con->state);
                        put_connection(con);
                }
-               
-               /* TBD: handle error; return for now */
-               if (ret < 0) {
+               if (ret <= 0) {
+                       /* TBD: handle error; return for now */
                        con->error = ret;
                        goto done; /* error */
                }
@@ -406,8 +430,9 @@ more:
        }
        
        /* hmm, nothing to do! No more writes pending? */
-       if (ret)
-               clear_bit(WRITE_PEND, &con->state);
+       dout(30, "try_write nothing else to write\n");
+       clear_bit(WRITE_PENDING, &con->state);
+
 done:
        return;
 }
@@ -535,8 +560,75 @@ static void process_ack(struct ceph_connection *con, __u32 ack)
 }
 
 
+/* 
+ * read portion of connect-side handshake on a new connection
+ */
+static int read_connect_partial(struct ceph_connection *con)
+{
+       int ret, to;
+       dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos);
+
+       /* actual_peer_addr */
+       to = sizeof(con->actual_peer_addr);
+       while (con->in_base_pos < to) {
+               int left = to - con->in_base_pos;
+               int have = con->in_base_pos;
+               ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left);
+               if (ret <= 0) goto out;
+               con->in_base_pos += ret;
+       }
+
+       /* in_tag */
+       to = sizeof(con->actual_peer_addr) + 1;
+       if (con->in_base_pos < to) {
+               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+               if (ret <= 0) goto out;
+               con->in_base_pos += ret;
+       }
+
+       /* peer_connect_seq */
+       to += sizeof(con->peer_connect_seq);
+       if (con->in_base_pos < to) {
+               int left = to - con->in_base_pos;
+               int have = sizeof(con->peer_connect_seq) - left;
+               ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_connect_seq + have, left);
+               if (ret <= 0) goto out;
+               con->in_base_pos += ret;
+       }       
+       ret = 1;
+out:
+       dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
+       return ret; /* done */
+}
+
+static void process_connect(struct ceph_connection *con)
+{
+       dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
+       clear_bit(CONNECTING, &con->state);
+       if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
+               derr(1, "process_connect wrong peer, want %x:%d/%d, got %x:%d/%d, wtf\n",
+                    ntohl(con->peer_addr.ipaddr.sin_addr.s_addr), 
+                    ntohs(con->peer_addr.ipaddr.sin_port),
+                    con->peer_addr.nonce,
+                    ntohl(con->actual_peer_addr.ipaddr.sin_addr.s_addr), 
+                    ntohs(con->actual_peer_addr.ipaddr.sin_port),
+                    con->actual_peer_addr.nonce);
+               con->in_tag = CEPH_MSGR_TAG_REJECT;
+       }
+       if (con->in_tag == CEPH_MSGR_TAG_REJECT) {
+               dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq);
+               set_bit(CLOSED, &con->state);
+       }
+       if (con->in_tag == CEPH_MSGR_TAG_READY) {
+               dout(10, "process_connect got READY, now open\n");
+               set_bit(OPEN, &con->state);
+       }
+}
+
+
+
 /*
- * read portion of handshake on a newly accepted connection
+ * read portion of accept-side handshake on a newly accepted connection
  */
 static int read_accept_partial(struct ceph_connection *con)
 {
@@ -572,7 +664,7 @@ static void process_accept(struct ceph_connection *con)
        spin_lock(&con->msgr->con_lock);
        existing = get_connection(con->msgr, &con->peer_addr);
        if (existing) {
-               spin_lock(&existing->con_lock);
+               spin_lock(&existing->lock);
                if ((test_bit(CONNECTING, &existing->state) && 
                     compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
                    (test_bit(OPEN, &existing->state) && 
@@ -584,20 +676,22 @@ static void process_accept(struct ceph_connection *con)
                        con->out_seq = existing->out_seq;
                        set_bit(OPEN, &con->state);
                        set_bit(CLOSED, &existing->state);
+                       clear_bit(OPEN, &existing->state);
                } else {
                        /* reject new connection */
                        set_bit(REJECTING, &con->state);
                        con->connect_seq = existing->connect_seq; /* send this with the reject */
                }
-               spin_unlock(&existing->con_lock);
+               spin_unlock(&existing->lock);
                put_connection(existing);
        } else {
                add_connection(con->msgr, con);
-               set_bit(OPEN, &con->state);
+               con->state = OPEN;
        }
        spin_unlock(&con->msgr->con_lock);
 
        /* the result? */
+       clear_bit(ACCEPTING, &con->state);
        if (test_bit(REJECTING, &con->state))
                prepare_write_accept_reject(con);
        else
@@ -615,6 +709,7 @@ static void try_read(struct work_struct *work)
        struct ceph_messenger *msgr;
 
        con = container_of(work, struct ceph_connection, rwork);
+       spin_lock(&con->lock);
        msgr = con->msgr;
 
 more:
@@ -624,12 +719,20 @@ more:
 
        if (test_bit(CLOSED, &con->state)) goto done;
        if (test_bit(ACCEPTING, &con->state)) {
+               dout(20, "try_read accepting\n");
                ret = read_accept_partial(con);
                if (ret <= 0) goto done;
                /* accepted */
                process_accept(con);
                goto more;
        }
+       if (test_bit(CONNECTING, &con->state)) {
+               dout(20, "try_read connecting\n");
+               ret = read_connect_partial(con);
+               if (ret <= 0) goto done;
+               process_connect(con);
+               if (test_bit(CLOSED, &con->state)) goto done;
+       }
 
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
@@ -639,7 +742,8 @@ more:
                else if (con->in_tag == CEPH_MSGR_TAG_ACK)
                        prepare_read_ack(con);
                else {
-                       printk(KERN_INFO "bad tag %d\n", (int)con->in_tag);
+                       derr(2, "try_read got bad tag %d\n", (int)con->in_tag);
+                       ret = -EINVAL;
                        goto bad;
                }
                goto more;
@@ -662,10 +766,12 @@ more:
                con->in_tag = CEPH_MSGR_TAG_READY;
                goto more;
        }
+       derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag);
 bad:
        BUG_ON(1); /* shouldn't get here */
 done:
        con->error = ret;
+       spin_unlock(&con->lock);
        return;
 }
 
@@ -688,21 +794,18 @@ static void try_accept(struct work_struct *work)
                        derr(1, "malloc failure\n");
                goto done;
                }
-
-       if(ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
+       if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
                derr(1, "error accepting connection\n");
-               kfree(new_con);
+               put_connection(new_con);
                 goto done;
         }
        dout(5, "accepted connection \n");
 
        new_con->in_tag = CEPH_MSGR_TAG_READY;
-
+       set_bit(ACCEPTING, &new_con->state);
        prepare_write_accept_announce(msgr, new_con);
-
        add_connection_accepting(msgr, new_con);
 
-       set_bit(WRITE_PEND, &new_con->state);
        /*
         * hand off to worker threads ,should be able to write, we want to 
         * try to write right away, we may have missed socket state change
@@ -715,7 +818,7 @@ done:
 /*
  * create a new messenger instance, creates listening socket
  */
-struct ceph_messenger *ceph_messenger_create()
+struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 {
         struct ceph_messenger *msgr;
        int ret = 0;
@@ -735,7 +838,8 @@ struct ceph_messenger *ceph_messenger_create()
                kfree(msgr);
                return  ERR_PTR(ret);
        }
-
+       if (myaddr) 
+               msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
 
        dout(1, "ceph_messenger_create %p listening on %x:%d\n", msgr,
             ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), 
@@ -794,13 +898,15 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
        }                    
 
-       spin_lock(&con->con_lock);
+       spin_lock(&con->lock);
 
        /* initiate connect? */
-       if (test_bit(NEW, &con->state)) {
-               dout(5, "ceph_msg_send initiating connect on %p\n", con);
+       dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state);
+       if (test_and_clear_bit(NEW, &con->state)) {
+               set_bit(CONNECTING, &con->state);
+               dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state);
                ret = ceph_tcp_connect(con);
-               if (ret < 0){
+               if (ret < 0) {
                        derr(1, "connection failure to peer %x:%d\n",
                             ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
                             ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
@@ -808,16 +914,17 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                        put_connection(con);
                        return(ret);
                }
+               prepare_write_connect(msgr, con);
        }
        
        /* queue */
-       dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n",
+       msg->hdr.seq = ++con->out_seq;
+       dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
             ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
        ceph_msg_get(msg);
+       list_add_tail(&msg->list_head, &con->out_queue);
 
-       list_add(&msg->list_head, &con->out_queue);
-       set_bit(WRITE_PEND, &con->state);
-       spin_unlock(&con->con_lock);
+       spin_unlock(&con->lock);
        put_connection(con);
        return ret;
 }
@@ -829,7 +936,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of
        struct ceph_msg *m;
        int i;
 
-       m = kzalloc(sizeof(*m), GFP_KERNEL);
+       m = kmalloc(sizeof(*m), GFP_KERNEL);
        if (m == NULL)
                goto out;
        atomic_set(&m->nref, 1);
@@ -853,7 +960,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of
                        if (m->pages[i] == NULL)
                                goto out2;
                }
+       } else {
+               m->pages = 0;
        }
+
+       INIT_LIST_HEAD(&m->list_head);
        return m;
 
 out2:
@@ -864,8 +975,9 @@ out:
 
 void ceph_msg_put(struct ceph_msg *m)
 {
+       int i;
        if (atomic_dec_and_test(&m->nref)) {
-               int i;
+               dout(30, "ceph_msg_put last one on %p\n", m);
                if (m->pages) {
                        for (i=0; i<m->nr_pages; i++)
                                if (m->pages[i])
index 71c35c015ce10b6f2e2f3ece44f12fe53362f6ed..5fa7c13b897ebbff7478243b8875827b96afc680 100644 (file)
@@ -52,13 +52,13 @@ struct ceph_msg_pos {
 
 
 /* current state of connection */
-#define NEW 1
-#define CONNECTING 2
-#define ACCEPTING 3
-#define OPEN 4
-#define WRITE_PEND 5
-#define REJECTING 6
-#define CLOSED 7
+#define NEW            1
+#define CONNECTING     2
+#define ACCEPTING      3
+#define OPEN           4
+#define WRITE_PENDING  5
+#define REJECTING      6
+#define CLOSED         7
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
@@ -66,7 +66,7 @@ struct ceph_connection {
        __u32 state;            /* connection state */
        
        atomic_t nref;
-       spinlock_t con_lock;    /* connection lock */
+       spinlock_t lock;        /* connection lock */
 
        struct list_head list_all;   /* msgr->con_all */
        struct list_head list_bucket;  /* msgr->con_open or con_accepting */
@@ -76,6 +76,10 @@ struct ceph_connection {
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
 
+       /* connect state */
+       struct ceph_entity_addr actual_peer_addr;
+       __u32 peer_connect_seq;
+
        /* out queue */
        struct list_head out_queue;
        struct ceph_msg_header out_hdr;
@@ -103,7 +107,7 @@ struct ceph_connection {
 };
 
 
-extern struct ceph_messenger *ceph_messenger_create(void);
+extern struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr);
 extern void ceph_messenger_destroy(struct ceph_messenger *);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off);
@@ -118,22 +122,22 @@ extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg);
 static __inline__ int ceph_decode_64(void **p, void *end, __u64 *v) {
        if (unlikely(*p + sizeof(*v) > end))
                return -EINVAL;
-       *v = le64_to_cpu(*(__u64*)p);
-       p += sizeof(*v);
+       *v = le64_to_cpu(*(__u64*)*p);
+       *p += sizeof(*v);
        return 0;
 }
 static __inline__ int ceph_decode_32(void **p, void *end, __u32 *v) {
        if (unlikely(*p + sizeof(*v) > end))
                return -EINVAL;
-       *v = le32_to_cpu(*(__u32*)p);
-       p += sizeof(*v);
+       *v = le32_to_cpu(*(__u32*)*p);
+       *p += sizeof(*v);
        return 0;
 }
 static __inline__ int ceph_decode_16(void **p, void *end, __u16 *v) {
        if (unlikely(*p + sizeof(*v) > end))
                return -EINVAL;
-       *v = le16_to_cpu(*(__u16*)p);
-       p += sizeof(*v);
+       *v = le16_to_cpu(*(__u16*)*p);
+       *p += sizeof(*v);
        return 0;
 }
 static __inline__ int ceph_decode_copy(void **p, void *end, void *v, int len) {
@@ -159,10 +163,10 @@ static __inline__ int ceph_decode_addr(void **p, void *end, struct ceph_entity_a
 static __inline__ int ceph_decode_name(void **p, void *end, struct ceph_entity_name *v) {
        if (unlikely(*p + sizeof(*v) > end))
                return -EINVAL;
-       v->type = le32_to_cpu(*(__u32*)p);
-       p += sizeof(__u32);
-       v->num = le32_to_cpu(*(__u32*)p);
-       p += sizeof(__u32);
+       v->type = le32_to_cpu(*(__u32*)*p);
+       *p += sizeof(__u32);
+       v->num = le32_to_cpu(*(__u32*)*p);
+       *p += sizeof(__u32);
        return 0;
 }
 
@@ -207,15 +211,15 @@ static __inline__ void ceph_decode_header(struct ceph_msg_header *to)
 
 static __inline__ int ceph_encode_64(void **p, void *end, __u64 v) {
        BUG_ON(*p + sizeof(v) > end);
-       *(__u64*)p = cpu_to_le64(v);
-       p += sizeof(v);
+       *(__u64*)*p = cpu_to_le64(v);
+       *p += sizeof(v);
        return 0;
 }
 
 static __inline__ int ceph_encode_32(void **p, void *end, __u32 v) {
        BUG_ON(*p + sizeof(v) > end);
-       *(__u32*)p = cpu_to_le64(v);
-       p += sizeof(v);
+       *(__u32*)*p = cpu_to_le64(v);
+       *p += sizeof(v);
        return 0;
 }
 
index 35e8a51bdc6e87660beaef874cbc130d113c258b..b3d8d22d29c5067caefc15eb7b190212722d1c5e 100644 (file)
@@ -312,6 +312,10 @@ void ceph_osdc_init(struct ceph_osd_client *osdc)
 {
        dout(5, "ceph_osdc_init\n");
        osdc->osdmap = NULL;
+       osdc->last_tid = 0;
+       INIT_RADIX_TREE(&osdc->request_tree, GFP_KERNEL);
+       osdc->last_requested_map = 0;
+       init_completion(&osdc->map_waiters);
 }
 
 
index f68aa4ff2c0578fdcc4ff9120ca3c075908418fd..ccfa7a3d11e6eb562152d09a6d7bd9656de05e36 100644 (file)
@@ -184,14 +184,16 @@ enum {
        Opt_fsidmajor, 
        Opt_fsidminor,
        Opt_debug,
-       Opt_monport     
+       Opt_monport,
+       Opt_ip
 };
 
 static match_table_t arg_tokens = {
        {Opt_fsidmajor, "fsidmajor=%ld"},
        {Opt_fsidminor, "fsidminor=%ld"},
        {Opt_debug, "debug=%d"},
-       {Opt_monport, "monport=%d"}
+       {Opt_monport, "monport=%d"},
+       {Opt_ip, "ip=%s"}
 };
 
 /*
@@ -207,18 +209,20 @@ static int parse_ip(const char *c, int len, struct ceph_entity_addr *addr)
        dout(15, "parse_ip on '%s' len %d\n", c, len);
        for (i=0; *p && i<4; i++) {
                v = 0;
+               //dout(30, " i=%d at %s\n", i, p);
                while (*p && *p != '.' && p < c+len) {
                        if (*p < '0' || *p > '9')
                                goto bad;
                        v = (v * 10) + (*p - '0');
                        p++;
                }
+               //dout(30, " v = %d\n", v);
                ip = (ip << 8) + v;
                if (!*p) 
                        break;
                p++;
        }
-       if (i < 4
+       if (p < c+len
                goto bad;
 
        *(__u32*)&addr->ipaddr.sin_addr.s_addr = htonl(ip);
@@ -269,16 +273,17 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru
        
        /* parse mount options */
        while ((c = strsep(&options, ",")) != NULL) {
-               int token;
-               int intval;
-               int ret;
+               int token, intval, ret, i;
                if (!*c) 
                        continue;
                token = match_token(c, arg_tokens, argstr);
-               ret = match_int(&argstr[0], &intval);
-               if (ret < 0) {
-                       dout(0, "bad mount arg, not int\n");
-                       continue;
+               if (token < Opt_ip) {
+                       ret = match_int(&argstr[0], &intval);
+                       if (ret < 0) {
+                               dout(0, "bad mount arg, not int\n");
+                               continue;
+                       }
+                       dout(30, "got token %d intval %d\n", token, intval);
                }
                switch (token) {
                case Opt_fsidmajor:
@@ -288,17 +293,23 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, stru
                        args->fsid.minor = intval;
                        break;
                case Opt_monport:
+                       dout(25, "parse_mount_args monport=%d\n", intval);
                        args->mon_port = intval;
+                       for (i=0; i<args->num_mon; i++)
+                               args->mon_addr[i].ipaddr.sin_port = htons(intval);
                        break;
                case Opt_debug:
                        ceph_debug = intval;
                        break;
+               case Opt_ip:
+                       parse_ip(argstr[0].from, argstr[0].to-argstr[0].from, &args->my_addr);
+                       break;
                default:
-                       derr(1, "bad mount option %s\n", c);
+                       derr(1, "parse_mount_args bad token %d\n", token);
                        continue;
                }
        }
-       
+
        return 0;
 }
 
index 3055e978c2513522f3f8e5e3b9f4fd6655469733..4da42ac81aa063659695dd880833ac1ddc184845 100644 (file)
@@ -16,6 +16,7 @@ struct ceph_mount_args {
        int mntflags;
        int flags;
        struct ceph_fsid fsid;
+       struct ceph_entity_addr my_addr;
        int num_mon;
        struct ceph_entity_addr mon_addr[5];
        int mon_port;