]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
added socket callbacks, worker threads etc..
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 22 Nov 2007 01:07:22 +0000 (01:07 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 22 Nov 2007 01:07:22 +0000 (01:07 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2106 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/ktcp.c
trunk/ceph/kernel/ktcp.h
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/messenger.h

index 9bf03272e140381a5e0cb81f8edb9d764358b4de..d6bb68e1391b889cd1630526a421f86f1ee5aebb 100644 (file)
 #include "messenger.h"
 #include "ktcp.h"
 
+static struct workqueue_struct *recv_wq;        /* receive work queue */
+static struct workqueue_struct *send_wq;        /* send work queue */
 
-struct socket * _kconnect(struct sockaddr *saddr)
+/*
+ * socket callback functions 
+ */
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
+{
+        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+
+        printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state);
+       queue_work(recv_wq, &con->rwork);
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+
+        printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
+        if (test_bit(WRITE_PEND, &con->state)) {
+                printk(KERN_INFO "WRITE_PEND set in connection\n");
+                queue_work(send_wq, &con->swork);
+        }
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+        /* TBD: probably want to set our connection state to OPEN
+         * if state not set to READ or WRITE pending
+         */
+        printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
+        if (sk->sk_state == TCP_ESTABLISHED) {
+                if (test_and_clear_bit(CONNECTING, &con->state))
+                        set_bit(OPEN, &con->state);
+                ceph_write_space(sk);
+        }
+}
+
+/* make a socket active by setting up the call back functions */
+int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+{
+        struct sock *sk = sock->sk;
+        sk->sk_user_data = con;
+        printk(KERN_INFO "Entered add_sock_callbacks\n");
+
+        /* Install callbacks */
+        sk->sk_data_ready = ceph_data_ready;
+        sk->sk_write_space = ceph_write_space;
+        sk->sk_state_change = ceph_state_change;
+
+        return 0;
+}
+/*
+ * initiate connection to a remote socket.
+ */
+int _kconnect(struct ceph_connection *con)
 {
        int ret;
-       struct socket *sd = NULL;
+       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
 
-/* TBD: somewhere check for a connection already established to this node? */
-/*      if we are keeping connections alive for a period of time           */
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+        set_bit(CONNECTING, &con->state);
+
+        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
         if (ret < 0) {
-               printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-       } else {
-       /* or could call kernel_connect(), opted to reduce call overhead */
-               ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
-                                         sizeof (struct sockaddr_in),0);
-               if (ret < 0) {
-                       printk(KERN_INFO "kernel_connect error: %d\n", ret);
-                       sock_release(sd);
-               }
-       }
-       return(sd);
+                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+                goto done;
+        }
+
+        /* setup callbacks */
+        add_sock_callbacks(con->sock, con);
+
+
+        ret = con->sock->ops->connect(con->sock, paddr,
+                                      sizeof(struct sockaddr_in), O_NONBLOCK);
+        if (ret == -EINPROGRESS) return 0;
+        if (ret < 0) {
+                /* TBD check for fatal errors, retry if not fatal.. */
+                printk(KERN_INFO "kernel_connect error: %d\n", ret);
+                sock_release(con->sock);
+                con->sock = NULL;
+        }
+done:
+        return ret;
 }
 
-struct socket * _klisten(struct sockaddr_in *in_addr)
+int _klisten(struct ceph_messenger *msgr)
 {
        int ret;
-       struct socket *sd = NULL;
+       struct socket *sock = NULL;
+       int optval = 1;
+       struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
 
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
         if (ret < 0) {
                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-               return ERR_PTR(ret);
+               return ret;
        }
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&optval, sizeof(optval)); 
+       if (ret < 0) {
+               printk("Failed to set SO_REUSEADDR: %d\n", ret);
+               goto err;
+       }
+
+       /* set user_data to be the messenger */
+       sock->sk->sk_user_data = msgr;
 
        /* no user specified address given so create, will allow arg to mount */
-       if (!in_addr->sin_addr.s_addr) {
-               in_addr->sin_family = AF_INET;
-               in_addr->sin_addr.s_addr = htonl(INADDR_ANY);
-               in_addr->sin_port = htons(CEPH_PORT);  /* known port for now */
-               /* in_addr->sin_port = htons(0); */  /* any port */
+       myaddr->sin_family = AF_INET;
+       myaddr->sin_addr.s_addr = htonl(INADDR_ANY);
+       myaddr->sin_port = htons(CEPH_PORT);  /* known port for now */
+       /* myaddr->sin_port = htons(0); */  /* any port */
+       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, 
+                               sizeof(struct sockaddr_in));
+       if (ret < 0) {
+               printk("Failed to bind to port %d\n", ret);
+               goto err;
        }
 
-/* TBD: set sock options... */
-       /*  ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)optval, optlen); 
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+                               (char *)&optval, sizeof(optval)); 
        if (ret < 0) {
-               printk("Failed to set SO_REUSEADDR: %d\n", ret);
-       }  */
-       ret = sd->ops->bind(sd, (struct sockaddr*)in_addr, sizeof(in_addr));
-/* TBD: probaby want to tune the backlog queue .. */
-       ret = sd->ops->listen(sd, NUM_BACKUP);
+               printk("Failed to set SO_KEEPALIVE: %d\n", ret);
+               goto err;
+       }
+
+       msgr->listen_sock = sock;
+
+       /* TBD: probaby want to tune the backlog queue .. */
+       ret = sock->ops->listen(sock, NUM_BACKUP);
        if (ret < 0) {
                printk(KERN_INFO "kernel_listen error: %d\n", ret);
-               sock_release(sd);
-               sd = NULL;
+               msgr->listen_sock = NULL;
+               goto err;
        }
-       return(sd);
+       return ret;
+err:
+       sock_release(sock);
+       return ret;
 }
 
 /*
  * Note: Maybe don't need this, or make inline... keep for now for debugging..
  * we may need to add more functionality
  */
-struct socket *_kaccept(struct socket *sd)
+struct socket *_kaccept(struct socket *sock)
 {
-       struct socket *new_sd = NULL;
+       struct socket *new_sock = NULL;
        int ret;
 
-
-       ret = kernel_accept(sd, &new_sd, sd->file->f_flags);
+       
+       ret = kernel_accept(sock, &new_sock, sock->file->f_flags);
         if (ret < 0) {
                printk(KERN_INFO "kernel_accept error: %d\n", ret);
-               return(new_sd);
+               return(new_sock);
        }
 /* TBD:  shall we check name for validity?  */
-       return(new_sd);
+       return(new_sock);
 }
 
 /*
  * receive a message this may return after partial send
  */
-int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
+int _krecvmsg(struct socket *sock, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
-       struct msghdr msg = {.msg_flags = msgflags};
+       struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;           /* length read */
 
        printk(KERN_INFO "entered krevmsg\n");
        msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
 
        /* receive one kvec for now...  */
-       rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags);
+       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
         if (rlen < 0) {
                printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
         }
+       /* TBD: kernel_recvmsg doesn't fill in the name and namelen
+         */
        return(rlen);
 
 }
@@ -108,32 +196,77 @@ int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
 /*
  * Send a message this may return after partial send
  */
-int _ksendmsg(struct socket *sd, struct kvec *iov, 
-               size_t kvlen, size_t len, unsigned msgflags)
+int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len)
 {
-       struct msghdr msg = {.msg_flags = msgflags};
+       struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;
 
        printk(KERN_INFO "entered ksendmsg\n");
        msg.msg_flags |=  MSG_DONTWAIT | MSG_NOSIGNAL;
 
-       rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len);
+       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
         if (rlen < 0) {
                printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
         }
        return(rlen);
 }
 
-struct sockaddr *_kgetname(struct socket *sd)
+struct sockaddr *_kgetname(struct socket *sock)
 {
        struct sockaddr *saddr = NULL;
        int len;
        int ret;
 
-       if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr,
+       if ((ret = sock->ops->getname(sock, (struct sockaddr *)saddr,
                                        &len, 2) < 0)) {
                printk(KERN_INFO "kernel getname error: %d\n", ret);
        }
        return(saddr);
 
 }
+/*
+ *  workqueue initialization
+ */
+
+int work_init(void)
+{
+        int ret = 0;
+
+        printk(KERN_INFO "entered work_init\n");
+        /*
+         * Create a num CPU threads to handle receive requests
+         * note: we can create more threads if needed to even out
+         * the scheduling of multiple requests..
+         */
+        recv_wq = create_workqueue("ceph-recv");
+        ret = IS_ERR(recv_wq);
+        if (ret) {
+                printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+                destroy_workqueue(recv_wq);
+                return ret;
+        }
+
+        /*
+         * Create a single thread to handle send requests
+         * note: may use same thread pool as receive workers later...
+         */
+        send_wq = create_singlethread_workqueue("ceph-send");
+        ret = IS_ERR(send_wq);
+        if (ret) {
+                printk(KERN_INFO "send worker failed to start: %d\n", ret);
+                destroy_workqueue(send_wq);
+                return ret;
+        }
+        printk(KERN_INFO "successfully created wrkqueues\n");
+
+        return(ret);
+}
+
+/*
+ *  workqueue shutdown
+ */
+void shutdown_workqueues(void)
+{
+        destroy_workqueue(send_wq);
+        destroy_workqueue(recv_wq);
+}
index 279a56f090d42f5387dad2e0a100d5654ab1d578..0a16444207e77bc10f8827d495fb768d86f573fa 100644 (file)
@@ -2,11 +2,11 @@
 #define _FS_CEPH_TCP_H
 
 /* prototype definitions */
-struct socket * _kconnect(struct sockaddr *);
-struct socket * _klisten(struct sockaddr_in *);
+int _kconnect(struct ceph_connection *);
+int _klisten(struct ceph_messenger *);
 struct socket *_kaccept(struct socket *);
-int _krecvmsg(struct socket *, void *, size_t , unsigned);
-int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned);
+int _krecvmsg(struct socket *, void *, size_t );
+int _ksendmsg(struct socket *, struct kvec *, size_t, size_t);
 
 /* Well known port for ceph client listener.. */
 #define CEPH_PORT 2002
index 8c43d1c060727a1bee5f3f94d920f00919da0536..f4fd71d3b5e0f92f65e380b65554cfc3befa230d 100644 (file)
@@ -61,6 +61,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        con->msgr = msgr;
 
        spin_lock_init(&con->con_lock);
+       set_bit(NEW, &con->state);
        INIT_WORK(&con->rwork, try_read);       /* setup work structure */
        INIT_WORK(&con->swork, try_write);      /* setup work structure */
 
@@ -162,8 +163,8 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio
 
        spin_lock(&msgr->con_lock);
        list_del(&con->list_all);
-       if (con->state == CONNECTING ||
-           con->state == OPEN) {
+       if (test_bit(CONNECTING, &con->state) || 
+           test_bit(OPEN, &con->state)) {
                /* remove from con_open too */
                if (list_empty(&con->list_bucket)) {
                        /* last one */
@@ -190,30 +191,6 @@ static void replace_connection(struct ceph_messenger *msgr, struct ceph_connecti
        spin_unlock(&msgr->con_lock);
        put_connection(old); /* dec reference count */
 }
-               
-
-/*
- * initiate connection to a remote socket.
- *
- * TBD: make this async, somehow!
- */
-static int do_connect(struct ceph_connection *con)
-{
-       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-
-       dout(1, "do_connect on %p\n", con);
-       con->sock = _kconnect(paddr);
-       if (IS_ERR(con->sock)) {
-               con->sock = 0;
-               return PTR_ERR(con->sock);
-       }
-
-       /* setup callbacks */
-       
-
-       return 0;
-}
-
 
 /*
  * non-blocking versions
@@ -232,7 +209,7 @@ static int write_partial_kvec(struct ceph_connection *con)
        int ret;
 
        while (con->out_kvec_bytes > 0) {
-               ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes, 0);
+               ret = _ksendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes);
                if (ret < 0) return ret;  /* error */
                if (ret == 0) return 0;   /* socket full */
                con->out_kvec_bytes -= ret;
@@ -263,7 +240,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg
                kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
                kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 
                                 (int)(msg->hdr.data_len - con->out_msg_pos.data_pos));
-               ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len, 0);
+               ret = _ksendmsg(con->sock, &kv, 1, kv.iov_len);
                if (ret < 0) return ret;
                if (ret == 0) return 0;   /* socket full */
                con->out_msg_pos.data_pos += ret;
@@ -374,10 +351,10 @@ more:
                if (ret == 0) 
                        goto done;
                
-               if (con->state == REJECTING) {
+               if (test_bit(REJECTING, &con->state)) {
                        /* FIXME do something else here, pbly? */
                        remove_connection(msgr, con);
-                       con->state = CLOSED;  
+                       set_bit(CLOSED, &con->state);
                        put_connection(con);
                }
                
@@ -440,7 +417,7 @@ static int read_message_partial(struct ceph_connection *con)
        /* header */
        while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
                left = sizeof(struct ceph_msg_header) - con->in_base_pos;
-               ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
+               ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
                if (con->in_base_pos == sizeof(struct ceph_msg_header)) {
@@ -458,7 +435,7 @@ static int read_message_partial(struct ceph_connection *con)
                                return -ENOMEM;
                }
                left = m->hdr.front_len - m->front.iov_len;
-               ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left, 0);
+               ret = _krecvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left);
                if (ret <= 0) return ret;
                m->front.iov_len += ret;
        }
@@ -480,7 +457,7 @@ static int read_message_partial(struct ceph_connection *con)
                left = min((int)(m->hdr.data_len - con->in_msg_pos.data_pos),
                           (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
                p = kmap(m->pages[con->in_msg_pos.page]);
-               ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left, 0);
+               ret = _krecvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
                if (ret <= 0) return ret;
                con->in_msg_pos.data_pos += ret;
                con->in_msg_pos.page_pos += ret;
@@ -511,7 +488,7 @@ static int read_ack_partial(struct ceph_connection *con)
 {
        while (con->in_base_pos < sizeof(con->in_partial_ack)) {
                int left = sizeof(con->in_partial_ack) - con->in_base_pos;
-               int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0);
+               int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -541,7 +518,7 @@ static int read_accept_partial(struct ceph_connection *con)
        /* peer addr */
        while (con->in_base_pos < sizeof(con->peer_addr)) {
                int left = sizeof(con->peer_addr) - con->in_base_pos;
-               ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0);
+               ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -550,7 +527,7 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
                int off = con->in_base_pos - sizeof(con->peer_addr);
                int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos;
-               ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0);
+               ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -569,30 +546,32 @@ static void process_accept(struct ceph_connection *con)
        existing = get_connection(con->msgr, &con->peer_addr);
        if (existing) {
                spin_lock(&existing->con_lock);
-               if ((existing->state == CONNECTING && compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
-                   (existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
+               if ((test_bit(CONNECTING, &existing->state) && 
+                    compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
+                   (test_bit(OPEN, &existing->state) && 
+                    con->connect_seq == existing->connect_seq)) {
                        /* replace existing with new connection */
                        replace_connection(con->msgr, existing, con);
                        /* steal message queue */
                        list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
                        con->out_seq = existing->out_seq;
-                       con->state = OPEN;
-                       existing->state = CLOSED;
+                       set_bit(OPEN, &con->state);
+                       set_bit(CLOSED, &existing->state);
                } else {
                        /* reject new connection */
-                       con->state = REJECTING;
+                       set_bit(REJECTING, &con->state);
                        con->connect_seq = existing->connect_seq; /* send this with the reject */
                }
                spin_unlock(&existing->con_lock);
                put_connection(existing);
        } else {
                add_connection(con->msgr, con);
-               con->state = OPEN;
+               set_bit(OPEN, &con->state);
        }
        spin_unlock(&con->msgr->con_lock);
 
        /* the result? */
-       if (con->state == REJECTING)
+       if (test_bit(REJECTING, &con->state))
                prepare_write_accept_reject(con);
        else
                prepare_write_accept_ready(con);
@@ -615,13 +594,10 @@ more:
        /*
         * TBD: maybe store error in ceph_connection
          */
-       /* if (con->state == CLOSED) return -1; */
 
        if (test_bit(CLOSED, &con->state)) goto done;
        if (test_bit(ACCEPTING, &con->state)) {
                ret = read_accept_partial(con);
-               /* TBD: do something with error */
-               /* if (ret <= 0) return ret; */
                if (ret <= 0) goto done;
                /* accepted */
                process_accept(con);
@@ -629,9 +605,7 @@ more:
        }
 
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
-               ret = _krecvmsg(con->sock, &con->in_tag, 1, 0);
-               /* if (ret <= 0) return ret; */
-               /* TBD: do something with error */
+               ret = _krecvmsg(con->sock, &con->in_tag, 1);
                if (ret <= 0) goto done;
                if (con->in_tag == CEPH_MSGR_TAG_MSG) 
                        prepare_read_message(con);
@@ -645,8 +619,6 @@ more:
        }
        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
                ret = read_message_partial(con);
-               /* if (ret <= 0) return ret; */
-               /* TBD: do something with error */
                if (ret <= 0) goto done;
                /* got a full message! */
                msgr->dispatch(con->msgr->parent, con->in_msg);
@@ -657,8 +629,6 @@ more:
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
                ret = read_ack_partial(con);
-               /* if (ret <= 0) return ret; */
-               /* TBD: do something with error */
                if (ret <= 0) goto done;
                /* got an ack */
                process_ack(con, con->in_partial_ack);
@@ -668,6 +638,7 @@ more:
 bad:
        BUG_ON(1); /* shouldn't get here */
 done:
+       con->error = ret;
        return;
 }
 
@@ -733,7 +704,7 @@ done:
 struct ceph_messenger *ceph_messenger_create()
 {
         struct ceph_messenger *msgr;
-       struct sockaddr_in saddr;
+       int ret = 0;
 
         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
         if (msgr == NULL) 
@@ -742,18 +713,12 @@ struct ceph_messenger *ceph_messenger_create()
        spin_lock_init(&msgr->con_lock);
 
        /* create listening socket */
-       msgr->listen_sock = _klisten(&saddr);
-       if (IS_ERR(msgr->listen_sock)) {
-               int err = PTR_ERR(msgr->listen_sock);
+       ret = _klisten(msgr);
+       if(ret < 0) {
                kfree(msgr);
-               return ERR_PTR(err);
+               return  ERR_PTR(ret);
        }
 
-       /* determine my ip:port */
-       msgr->inst.addr.ipaddr.sin_family = saddr.sin_family;
-       msgr->inst.addr.ipaddr.sin_port = saddr.sin_port;
-       msgr->inst.addr.ipaddr.sin_addr = saddr.sin_addr;
-
        /* TBD: setup callback for accept */
        INIT_WORK(&msgr->awork, try_accept);       /* setup work structure */
 
@@ -772,6 +737,7 @@ struct ceph_messenger *ceph_messenger_create()
 int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
 {
        struct ceph_connection *con;
+       int ret = 0;
        
        /* set source */
        msg->hdr.src = msgr->inst;
@@ -787,7 +753,6 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), 
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
                con->peer_addr = msg->hdr.dst.addr;
-               con->state = CONNECTING;
                add_connection(msgr, con);
        } else {
                dout(5, "had connection to peer %x:%d\n",
@@ -799,8 +764,18 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
        spin_lock(&con->con_lock);
 
        /* initiate connect? */
-       if (con->state == CONNECTING)
-               do_connect(con);
+       if (test_bit(NEW, &con->state)) {
+               ret = _kconnect(con);
+               if (ret < 0){
+                       derr(1, "connection failure to peer %x:%d\n",
+                            ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
+                            ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
+                       remove_connection(msgr, con);
+                       kfree(con);
+                       return(ret);
+
+               }
+       }
        
        /* queue */
        dout(1, "queuing outgoing message for %s.%d\n",
@@ -810,7 +785,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
        
        spin_unlock(&con->con_lock);
        put_connection(con);
-       return 0;
+       return ret;
 }
 
 
index 1aacd77724c0f5590279a168f032b0022df80953..b2048a47b4b50dd69aba502ab9be5ef5489bbed6 100644 (file)
@@ -52,20 +52,18 @@ struct ceph_msg_pos {
 
 
 /* current state of connection */
-enum ceph_connection_state {
-       NEW = 1,
-       ACCEPTING = 2,
-       CONNECTING = 4,
-       OPEN = 8,
-       REJECTING = 16,
-       CLOSED = 32,
-       READ_PEND = 64,
-       WRITE_PEND = 128
-};
+#define NEW 1
+#define CONNECTING 2
+#define ACCEPTING 3
+#define OPEN 4
+#define WRITE_PEND 5
+#define REJECTING 6
+#define CLOSED 7
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
        struct socket *sock;    /* connection socket */
+       __u32 state;            /* connection state */
        
        atomic_t nref;
        spinlock_t con_lock;    /* connection lock */
@@ -74,7 +72,6 @@ struct ceph_connection {
        struct list_head list_bucket;  /* msgr->con_open or con_accepting */
 
        struct ceph_entity_addr peer_addr; /* peer address */
-       enum ceph_connection_state state;
        __u32 connect_seq;     
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */