From 5bfe10c1333c15bec420f4f24a1fa55c8b3b0f06 Mon Sep 17 00:00:00 2001 From: patiencew Date: Thu, 22 Nov 2007 06:47:10 +0000 Subject: [PATCH] _kaccept uniformity git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2112 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/ktcp.c | 63 ++++++++++++++++++++--------------- trunk/ceph/kernel/ktcp.h | 5 ++- trunk/ceph/kernel/messenger.c | 47 ++++++++++---------------- 3 files changed, 58 insertions(+), 57 deletions(-) diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index 6c38926af1680..b4f9d4a45ac18 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -5,8 +5,8 @@ #include "messenger.h" #include "ktcp.h" -static struct workqueue_struct *recv_wq; /* receive work queue */ -static struct workqueue_struct *send_wq; /* send work queue */ +struct workqueue_struct *recv_wq; /* receive work queue */ +struct workqueue_struct *send_wq; /* send work queue */ /* * socket callback functions @@ -47,7 +47,8 @@ static void ceph_state_change(struct sock *sk) printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state); if (sk->sk_state == TCP_ESTABLISHED) { - if (test_and_clear_bit(CONNECTING, &con->state)) + if (test_and_clear_bit(CONNECTING, &con->state) || + test_bit(ACCEPTING, &con->state)) set_bit(OPEN, &con->state); ceph_write_space(sk); } @@ -158,22 +159,45 @@ err: } /* - * Note: Maybe don't need this, or make inline... keep for now for debugging.. - * we may need to add more functionality + * accept a connection */ -struct socket *_kaccept(struct socket *sock) +int _kaccept(struct socket *sock, struct ceph_connection *con) { - struct socket *new_sock = NULL; int ret; + struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; + int len; - - ret = kernel_accept(sock, &new_sock, sock->file->f_flags); + + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { - printk(KERN_INFO "kernel_accept error: %d\n", ret); - return(new_sock); + printk(KERN_INFO "sock_create_kern error: %d\n", ret); + goto done; + } + + /* setup callbacks */ + add_sock_callbacks(con->sock, con); + + + ret = sock->ops->accept(sock, con->sock, O_NONBLOCK); + /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */ + if (ret < 0) { + printk(KERN_INFO "accept error: %d\n", ret); + goto err; } -/* TBD: shall we check name for validity? */ - return(new_sock); + con->sock->ops = sock->ops; + con->sock->type = sock->type; + ret = con->sock->ops->getname(con->sock, paddr, &len, 2); + if (ret < 0) { + printk(KERN_INFO "getname error: %d\n", ret); + goto err; + } + + set_bit(ACCEPTING, &con->state); +done: + return ret; +err: + sock_release(con->sock); + return ret; } /* @@ -217,19 +241,6 @@ int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len) return(rlen); } -struct sockaddr *_kgetname(struct socket *sock) -{ - struct sockaddr *saddr = NULL; - int len; - int ret; - - if ((ret = sock->ops->getname(sock, (struct sockaddr *)saddr, - &len, 2) < 0)) { - printk(KERN_INFO "kernel getname error: %d\n", ret); - } - return(saddr); - -} /* * workqueue initialization */ diff --git a/trunk/ceph/kernel/ktcp.h b/trunk/ceph/kernel/ktcp.h index 0a16444207e77..f0e07f669aeed 100644 --- a/trunk/ceph/kernel/ktcp.h +++ b/trunk/ceph/kernel/ktcp.h @@ -1,10 +1,13 @@ #ifndef _FS_CEPH_TCP_H #define _FS_CEPH_TCP_H +extern struct workqueue_struct *recv_wq; /* receive work queue */ +extern struct workqueue_struct *send_wq; /* send work queue */ + /* prototype definitions */ int _kconnect(struct ceph_connection *); int _klisten(struct ceph_messenger *); -struct socket *_kaccept(struct socket *); +int _kaccept(struct socket *, struct ceph_connection *); int _krecvmsg(struct socket *, void *, size_t ); int _ksendmsg(struct socket *, struct kvec *, size_t, size_t); diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index c34ee8fd7b23f..94a0629be4532 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -20,7 +20,6 @@ static void try_write(struct work_struct *); static void try_accept(struct work_struct *); - /* * calculate the number of pages a given length and offset map onto, * if we align the data. @@ -333,7 +332,7 @@ static void prepare_write_accept_reject(struct ceph_connection *con) } /* - * call when socket is writeable + * worker function when socket is writeable */ static void try_write(struct work_struct *work) { @@ -583,7 +582,7 @@ static void process_accept(struct ceph_connection *con) /* - * call when data is available on the socket + * worker function when data is available on the socket */ static void try_read(struct work_struct *work) { @@ -648,55 +647,43 @@ done: /* - * Accepter thread + * worker function when listener receives a connect */ static void try_accept(struct work_struct *work) { - struct socket *sock, *new_sock; - struct sockaddr_in saddr; struct ceph_connection *new_con = NULL; struct ceph_messenger *msgr; - int len; msgr = container_of(work, struct ceph_messenger, awork); - sock = msgr->listen_sock; dout(5, "Entered try_accept\n"); - - if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) { - derr(1, "error accepting connection\n"); - goto done; - } - dout(5, "accepted connection \n"); - - /* get the address at the other end */ - memset(&saddr, 0, sizeof(saddr)); - if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) { - derr(1, "getname error connection aborted\n"); - sock_release(new_sock); - goto done; - } - /* initialize the msgr connection */ new_con = new_connection(msgr); if (new_con == NULL) { derr(1, "malloc failure\n"); - sock_release(new_sock); goto done; } - new_con->sock = new_sock; - set_bit(ACCEPTING, &new_con->state); + + if(_kaccept(msgr->listen_sock, new_con) < 0) { + derr(1, "error accepting connection\n"); + kfree(new_con); + goto done; + } + dout(5, "accepted connection \n"); + new_con->in_tag = CEPH_MSGR_TAG_READY; - /* fill in part of peers address */ - new_con->peer_addr.ipaddr = saddr; prepare_write_accept_announce(msgr, new_con); add_connection_accepting(msgr, new_con); - /* hand off to worker threads , send pending */ - /*?? queue_work(send_wq, &new_con->swork);*/ + set_bit(WRITE_PEND, &new_con->state); + /* + * hand off to worker threads ,should be able to write, we want to + * try to write right away, we may have missed socket state change + */ + queue_work(send_wq, &new_con->swork); done: return; } -- 2.39.5