]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: clean up setup/shutdown; merge ktcp.c into messenger.c
authorSage Weil <sage@newdream.net>
Thu, 19 Jun 2008 03:45:22 +0000 (20:45 -0700)
committerSage Weil <sage@newdream.net>
Thu, 19 Jun 2008 03:45:22 +0000 (20:45 -0700)
src/Makefile.am
src/kernel/Makefile
src/kernel/ktcp.c [deleted file]
src/kernel/ktcp.h [deleted file]
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/proc.c
src/kernel/super.c
src/kernel/super.h

index c6686a55b6e97ddc3349c8ef1aa724b117c74401..70747ad1da6e07ed0a846949122eb6de48112806 100644 (file)
@@ -315,8 +315,6 @@ noinst_HEADERS = \
        kernel/export.c\
        kernel/file.c\
        kernel/inode.c\
-       kernel/ktcp.c\
-       kernel/ktcp.h\
        kernel/mds_client.c\
        kernel/mds_client.h\
        kernel/mdsmap.c\
index 3acce7c2f78590ffdea2452caf0b892202a37c7e..2a6d775c870613fe8c0231f10c8be255eb6a7192 100644 (file)
@@ -7,7 +7,7 @@ ifneq ($(KERNELRELEASE),)
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-objs := super.o inode.o dir.o file.o addr.o export.o \
-       ktcp.o messenger.o \
+       messenger.o \
        mds_client.o mdsmap.o \
        mon_client.o \
        osd_client.o osdmap.o crush/crush.o crush/mapper.o \
diff --git a/src/kernel/ktcp.c b/src/kernel/ktcp.c
deleted file mode 100644 (file)
index 3dc9073..0000000
+++ /dev/null
@@ -1,316 +0,0 @@
-#include <linux/socket.h>
-#include <linux/net.h>
-#include <net/tcp.h>
-#include <linux/string.h>
-#include "messenger.h"
-#include "ktcp.h"
-
-int ceph_debug_tcp;
-#define DOUT_VAR ceph_debug_tcp
-#define DOUT_PREFIX "tcp: "
-#include "super.h"
-
-struct workqueue_struct *con_wq;
-
-
-/*
- * socket callback functions
- */
-
-/* listen socket received a connect */
-static void ceph_accept_ready(struct sock *sk, int count_unused)
-{
-       struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
-
-       dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
-            msgr, sk->sk_state);
-       if (sk->sk_state == TCP_LISTEN)
-               queue_work(con_wq, &msgr->awork);
-}
-
-/* Data available on socket or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
-{
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
-       if (sk->sk_state != TCP_CLOSE_WAIT) {
-               dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
-                    con, con->state);
-               ceph_queue_con(con);
-       }
-}
-
-/* socket has bufferspace for writing */
-static void ceph_write_space(struct sock *sk)
-{
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
-
-       dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
-
-       /* only queue to workqueue if a WRITE is pending */
-       if (test_bit(WRITE_PENDING, &con->state)) {
-               dout(30, "ceph_write_space %p queuing write work\n", con);
-               ceph_queue_con(con);
-       }
-
-       /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
-       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
-}
-
-/* sockets state has change */
-static void ceph_state_change(struct sock *sk)
-{
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
-
-       dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
-            con, con->state, sk->sk_state);
-
-       switch (sk->sk_state) {
-       case TCP_CLOSE:
-               dout(30, "ceph_state_change TCP_CLOSE\n");
-       case TCP_CLOSE_WAIT:
-               dout(30, "ceph_state_change TCP_CLOSE_WAIT\n");
-               set_bit(SOCK_CLOSE, &con->state);
-               if (test_bit(CONNECTING, &con->state))
-                       con->error_msg = "connection refused";
-               else
-                       con->error_msg = "socket closed";
-               ceph_queue_con(con);
-               break;
-       case TCP_ESTABLISHED:
-               dout(30, "ceph_state_change TCP_ESTABLISHED\n");
-               ceph_write_space(sk);
-               break;
-       }
-}
-
-/* make a listening socket active by setting up the data ready call back */
-static void listen_sock_callbacks(struct socket *sock,
-                                 struct ceph_messenger *msgr)
-{
-       struct sock *sk = sock->sk;
-       sk->sk_user_data = (void *)msgr;
-       sk->sk_data_ready = ceph_accept_ready;
-}
-
-/* make a socket active by setting up the call back functions */
-static void set_sock_callbacks(struct socket *sock,
-                              struct ceph_connection *con)
-{
-       struct sock *sk = sock->sk;
-       sk->sk_user_data = (void *)con;
-       sk->sk_data_ready = ceph_data_ready;
-       sk->sk_write_space = ceph_write_space;
-       sk->sk_state_change = ceph_state_change;
-}
-
-/*
- * initiate connection to a remote socket.
- */
-struct socket *ceph_tcp_connect(struct ceph_connection *con)
-{
-       int ret;
-       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-       struct socket *sock;
-
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret)
-               return ERR_PTR(ret);
-
-       con->sock = sock;
-       sock->sk->sk_allocation = GFP_NOFS;
-
-       set_sock_callbacks(sock, con);
-
-       ret = sock->ops->connect(sock, paddr,
-                                sizeof(struct sockaddr_in), O_NONBLOCK);
-       if (ret == -EINPROGRESS) {
-               dout(20, "connect EINPROGRESS sk_state = = %u\n",
-                    sock->sk->sk_state);
-               ret = 0;
-       }
-       if (ret < 0) {
-               /* TBD check for fatal errors, retry if not fatal.. */
-               derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
-                    IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
-               sock_release(sock);
-               con->sock = 0;
-       }
-
-       if (ret < 0)
-               return ERR_PTR(ret);
-       return sock;
-}
-
-/*
- * setup listening socket
- */
-int ceph_tcp_listen(struct ceph_messenger *msgr)
-{
-       int ret;
-       int optval = 1;
-       struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
-       int nlen;
-       struct socket *sock;
-
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret)
-               return ret;
-
-       sock->sk->sk_allocation = GFP_NOFS;
-
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&optval, sizeof(optval));
-       if (ret < 0) {
-               derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
-               goto err;
-       }
-
-       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
-                             sizeof(*myaddr));
-       if (ret < 0) {
-               derr(0, "Failed to bind: %d\n", ret);
-               goto err;
-       }
-
-       /* what port did we bind to? */
-       nlen = sizeof(*myaddr);
-       ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
-                                0);
-       if (ret < 0) {
-               derr(0, "failed to getsockname: %d\n", ret);
-               goto err;
-       }
-       dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
-
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
-                               (char *)&optval, sizeof(optval));
-       if (ret < 0) {
-               derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
-               goto err;
-       }
-
-       /* TBD: probaby want to tune the backlog queue .. */
-       ret = sock->ops->listen(sock, NUM_BACKUP);
-       if (ret < 0) {
-               derr(0, "kernel_listen error: %d\n", ret);
-               goto err;
-       }
-
-       /* ok! */
-       msgr->listen_sock = sock;
-       listen_sock_callbacks(sock, msgr);
-       return ret;
-
-err:
-       sock_release(sock);
-       return ret;
-}
-
-/*
- *  accept a connection
- */
-int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
-{
-       int ret;
-       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-       int len;
-       struct socket *sock;
-       
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret)
-               return ret;
-       con->sock = sock;
-
-       sock->sk->sk_allocation = GFP_NOFS;
-
-       ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
-       if (ret < 0) {
-               derr(0, "accept error: %d\n", ret);
-               goto err;
-       }
-
-       sock->ops = lsock->ops;
-       sock->type = lsock->type;
-       ret = sock->ops->getname(sock, paddr, &len, 2);
-       if (ret < 0) {
-               derr(0, "getname error: %d\n", ret);
-               goto err;
-       }
-
-       /* setup callbacks */
-       set_sock_callbacks(sock, con);
-
-       return ret;
-
-err:
-       sock->ops->shutdown(sock, SHUT_RDWR);
-       sock_release(sock);
-       return ret;
-}
-
-/*
- * receive a message this may return after partial send
- */
-int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
-{
-       struct kvec iov = {buf, len};
-       struct msghdr msg = {.msg_flags = 0};
-       int rlen = 0;           /* length read */
-
-       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
-       return(rlen);
-}
-
-
-/*
- * Send a message this may return after partial send
- */
-int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
-                    size_t kvlen, size_t len, int more)
-{
-       struct msghdr msg = {.msg_flags = 0};
-       int rlen = 0;
-
-       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-       if (more)
-               msg.msg_flags |= MSG_MORE;
-       else
-               msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
-
-       /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
-       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-       /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
-       return(rlen);
-}
-
-/*
- *  workqueue initialization
- */
-
-int ceph_workqueue_init(void)
-{
-       int ret = 0;
-
-       con_wq = create_workqueue("ceph-net");
-       if (IS_ERR(con_wq)) {
-               derr(0, "net worker failed to start: %d\n", ret);
-               destroy_workqueue(con_wq);
-               ret = PTR_ERR(con_wq);
-               con_wq = 0;
-               return ret;
-       }
-
-       return(ret);
-}
-
-/*
- *  workqueue shutdown
- */
-void ceph_workqueue_shutdown(void)
-{
-       destroy_workqueue(con_wq);
-}
diff --git a/src/kernel/ktcp.h b/src/kernel/ktcp.h
deleted file mode 100644 (file)
index f58ad68..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef _FS_CEPH_TCP_H
-#define _FS_CEPH_TCP_H
-
-extern struct workqueue_struct *con_wq;       /* receive work queue */
-
-/* prototype definitions */
-struct socket *ceph_tcp_connect(struct ceph_connection *);
-int ceph_tcp_listen(struct ceph_messenger *);
-int ceph_tcp_accept(struct socket *, struct ceph_connection *);
-int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
-int ceph_workqueue_init(void);
-void ceph_workqueue_shutdown(void);
-
-/* Max number of outstanding connections in listener queueu */
-#define NUM_BACKUP 10
-#endif
index 4e93d59725d92cbd2ab3523325c9b2b76987ca2a..20a508834aea2ce3dfa13a8c55e71dcb11fc4cdc 100644 (file)
@@ -7,7 +7,6 @@
 
 #include "ceph_fs.h"
 #include "messenger.h"
-#include "ktcp.h"
 
 int ceph_debug_msgr;
 #define DOUT_VAR ceph_debug_msgr
@@ -25,20 +24,311 @@ static char tag_ack = CEPH_MSGR_TAG_ACK;
 static void con_work(struct work_struct *);
 static void try_accept(struct work_struct *);
 
+
 /*
- * connections
+ *  workqueue
  */
+struct workqueue_struct *ceph_msgr_wq;
 
-static void con_close_socket(struct ceph_connection *con)
+int ceph_msgr_init(void)
 {
-       dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
-       if (!con->sock)
-               return;
-       con->sock->ops->shutdown(con->sock, SHUT_RDWR);
-       sock_release(con->sock);
-       con->sock = 0;
+       ceph_msgr_wq = create_workqueue("ceph-msgr");
+       if (IS_ERR(ceph_msgr_wq)) {
+               int ret = PTR_ERR(ceph_msgr_wq);
+               derr(0, "failed to create workqueue: %d\n", ret);
+               ceph_msgr_wq = 0;
+               return ret;
+       }
+       return 0;
+}
+
+void ceph_msgr_exit(void)
+{
+       destroy_workqueue(ceph_msgr_wq);
+}
+
+
+/*
+ * socket callback functions
+ */
+
+/* listen socket received a connect */
+static void ceph_accept_ready(struct sock *sk, int count_unused)
+{
+       struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
+
+       dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
+            msgr, sk->sk_state);
+       if (sk->sk_state == TCP_LISTEN)
+               queue_work(ceph_msgr_wq, &msgr->awork);
+}
+
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
+{
+       struct ceph_connection *con =
+               (struct ceph_connection *)sk->sk_user_data;
+       if (sk->sk_state != TCP_CLOSE_WAIT) {
+               dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
+                    con, con->state);
+               ceph_queue_con(con);
+       }
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+       struct ceph_connection *con =
+               (struct ceph_connection *)sk->sk_user_data;
+
+       dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
+
+       /* only queue to workqueue if a WRITE is pending */
+       if (test_bit(WRITE_PENDING, &con->state)) {
+               dout(30, "ceph_write_space %p queuing write work\n", con);
+               ceph_queue_con(con);
+       }
+
+       /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
+       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+       struct ceph_connection *con =
+               (struct ceph_connection *)sk->sk_user_data;
+
+       dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
+            con, con->state, sk->sk_state);
+
+       switch (sk->sk_state) {
+       case TCP_CLOSE:
+               dout(30, "ceph_state_change TCP_CLOSE\n");
+       case TCP_CLOSE_WAIT:
+               dout(30, "ceph_state_change TCP_CLOSE_WAIT\n");
+               set_bit(SOCK_CLOSE, &con->state);
+               if (test_bit(CONNECTING, &con->state))
+                       con->error_msg = "connection refused";
+               else
+                       con->error_msg = "socket closed";
+               ceph_queue_con(con);
+               break;
+       case TCP_ESTABLISHED:
+               dout(30, "ceph_state_change TCP_ESTABLISHED\n");
+               ceph_write_space(sk);
+               break;
+       }
+}
+
+/* make a listening socket active by setting up the data ready call back */
+static void listen_sock_callbacks(struct socket *sock,
+                                 struct ceph_messenger *msgr)
+{
+       struct sock *sk = sock->sk;
+       sk->sk_user_data = (void *)msgr;
+       sk->sk_data_ready = ceph_accept_ready;
+}
+
+/* make a socket active by setting up the call back functions */
+static void set_sock_callbacks(struct socket *sock,
+                              struct ceph_connection *con)
+{
+       struct sock *sk = sock->sk;
+       sk->sk_user_data = (void *)con;
+       sk->sk_data_ready = ceph_data_ready;
+       sk->sk_write_space = ceph_write_space;
+       sk->sk_state_change = ceph_state_change;
+}
+
+
+/*
+ * socket helpers
+ */
+
+/*
+ * initiate connection to a remote socket.
+ */
+struct socket *ceph_tcp_connect(struct ceph_connection *con)
+{
+       int ret;
+       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+       struct socket *sock;
+
+       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (ret)
+               return ERR_PTR(ret);
+
+       con->sock = sock;
+       sock->sk->sk_allocation = GFP_NOFS;
+
+       set_sock_callbacks(sock, con);
+
+       ret = sock->ops->connect(sock, paddr,
+                                sizeof(struct sockaddr_in), O_NONBLOCK);
+       if (ret == -EINPROGRESS) {
+               dout(20, "connect EINPROGRESS sk_state = = %u\n",
+                    sock->sk->sk_state);
+               ret = 0;
+       }
+       if (ret < 0) {
+               /* TBD check for fatal errors, retry if not fatal.. */
+               derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
+                    IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
+               sock_release(sock);
+               con->sock = 0;
+       }
+
+       if (ret < 0)
+               return ERR_PTR(ret);
+       return sock;
+}
+
+/*
+ * setup listening socket
+ */
+int ceph_tcp_listen(struct ceph_messenger *msgr)
+{
+       int ret;
+       int optval = 1;
+       struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
+       int nlen;
+       struct socket *sock;
+
+       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (ret)
+               return ret;
+
+       sock->sk->sk_allocation = GFP_NOFS;
+
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&optval, sizeof(optval));
+       if (ret < 0) {
+               derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
+               goto err;
+       }
+
+       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
+                             sizeof(*myaddr));
+       if (ret < 0) {
+               derr(0, "Failed to bind: %d\n", ret);
+               goto err;
+       }
+
+       /* what port did we bind to? */
+       nlen = sizeof(*myaddr);
+       ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
+                                0);
+       if (ret < 0) {
+               derr(0, "failed to getsockname: %d\n", ret);
+               goto err;
+       }
+       dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
+
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+                               (char *)&optval, sizeof(optval));
+       if (ret < 0) {
+               derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
+               goto err;
+       }
+
+       /* TBD: probaby want to tune the backlog queue .. */
+       ret = sock->ops->listen(sock, CEPH_MSGR_BACKUP);
+       if (ret < 0) {
+               derr(0, "kernel_listen error: %d\n", ret);
+               goto err;
+       }
+
+       /* ok! */
+       msgr->listen_sock = sock;
+       listen_sock_callbacks(sock, msgr);
+       return ret;
+
+err:
+       sock_release(sock);
+       return ret;
 }
 
+/*
+ *  accept a connection
+ */
+int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
+{
+       int ret;
+       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+       int len;
+       struct socket *sock;
+       
+       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+       if (ret)
+               return ret;
+       con->sock = sock;
+
+       sock->sk->sk_allocation = GFP_NOFS;
+
+       ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
+       if (ret < 0) {
+               derr(0, "accept error: %d\n", ret);
+               goto err;
+       }
+
+       sock->ops = lsock->ops;
+       sock->type = lsock->type;
+       ret = sock->ops->getname(sock, paddr, &len, 2);
+       if (ret < 0) {
+               derr(0, "getname error: %d\n", ret);
+               goto err;
+       }
+
+       /* setup callbacks */
+       set_sock_callbacks(sock, con);
+
+       return ret;
+
+err:
+       sock->ops->shutdown(sock, SHUT_RDWR);
+       sock_release(sock);
+       return ret;
+}
+
+/*
+ * receive a message this may return after partial send
+ */
+int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
+{
+       struct kvec iov = {buf, len};
+       struct msghdr msg = {.msg_flags = 0};
+       int rlen = 0;           /* length read */
+
+       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+       return(rlen);
+}
+
+
+/*
+ * Send a message this may return after partial send
+ */
+int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
+                    size_t kvlen, size_t len, int more)
+{
+       struct msghdr msg = {.msg_flags = 0};
+       int rlen = 0;
+
+       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+       if (more)
+               msg.msg_flags |= MSG_MORE;
+       else
+               msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
+
+       /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
+       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+       /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
+       return(rlen);
+}
+
+
+
 /*
  * create a new connection.  initial state is NEW.
  */
@@ -113,6 +403,18 @@ yes:
 }
 
 
+/*
+ * close connection socket
+ */
+static void con_close_socket(struct ceph_connection *con)
+{
+       dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
+       if (!con->sock)
+               return;
+       con->sock->ops->shutdown(con->sock, SHUT_RDWR);
+       sock_release(con->sock);
+       con->sock = 0;
+}
 
 /*
  * drop a reference
@@ -247,8 +549,8 @@ void ceph_queue_con(struct ceph_connection *con)
        
        set_bit(QUEUED, &con->state);
        if (test_bit(BUSY, &con->state) ||
-           !queue_work(con_wq, &con->work.work)) {
-               dout(40, "ceph_queue_write %p - already BUSY or queued\n", con);
+           !queue_work(ceph_msgr_wq, &con->work.work)) {
+               dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
                put_connection(con);
        }
 }
@@ -261,7 +563,7 @@ void ceph_queue_con_delayed(struct ceph_connection *con)
             atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        set_bit(QUEUED, &con->state);
        if (test_bit(BUSY, &con->state) ||
-           !queue_delayed_work(con_wq, &con->work,
+           !queue_delayed_work(ceph_msgr_wq, &con->work,
                                round_jiffies_relative(con->delay))) {
                dout(40, "ceph_queue_con_delayed %p - already BUSY or queued\n",
                     con);
@@ -1616,4 +1918,3 @@ void ceph_msg_put(struct ceph_msg *m)
        }
 }
 
-
index 7f339e1477824096e49cce630870d8068beed03a..c2f5092b6386eeba6598d8ca60051719743ce139 100644 (file)
@@ -13,6 +13,8 @@
 
 struct ceph_msg;
 
+extern struct workqueue_struct *ceph_msgr_wq;       /* receive work queue */
+
 typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
 typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn);
 typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
@@ -29,6 +31,8 @@ static __inline__ const char *ceph_name_type_str(int t) {
        }
 }
 
+#define CEPH_MSGR_BACKUP 10  /* backlogged incoming connections */
+
 /* use format string %s%d */
 #define ENTITY_NAME(n)                            \
        ceph_name_type_str(le32_to_cpu((n).type)), \
@@ -131,6 +135,8 @@ struct ceph_connection {
        unsigned long       delay;          /* delay interval */
 };
 
+extern int ceph_msgr_init(void);
+extern void ceph_msgr_exit(void);
 
 extern struct ceph_messenger *
 ceph_messenger_create(struct ceph_entity_addr *myaddr);
index 6c450da2ed63e7d557ff19230f00d4de03d6b679..e0dc14c33fcaa004a518f47877131c3e33142f88 100644 (file)
@@ -58,14 +58,13 @@ static int ceph_debug_level_write(struct file *file, const char __user *buffer,
 
 static struct proc_dir_entry *proc_fs_ceph;
 
-void ceph_proc_init(void)
+int ceph_proc_init(void)
 {
        struct proc_dir_entry *pde;
 
        proc_fs_ceph = proc_mkdir("ceph", proc_root_fs);
-
        if (!proc_fs_ceph)
-               return;
+               return -ENOMEM;
 
        proc_fs_ceph->owner = THIS_MODULE;
        pde = create_proc_read_entry("debug", 0,
@@ -84,6 +83,7 @@ void ceph_proc_init(void)
        if (pde)
                pde->write_proc = ceph_debug_level_write;
 
+       return 0;
 }
 
 void ceph_proc_cleanup()
index be3954651fc21f90dfda9e7a6de066de946060ff..30d4557857015bd3ba69a60e85b0b22462fe3df9 100644 (file)
@@ -29,7 +29,6 @@ int ceph_debug_super = -1;
 #define DOUT_VAR ceph_debug_super
 #define DOUT_PREFIX "super: "
 #include "super.h"
-#include "ktcp.h"
 
 #include <linux/statfs.h>
 #include "mon_client.h"
@@ -335,7 +334,6 @@ enum {
        Opt_debug,
        Opt_debug_console,
        Opt_debug_msgr,
-       Opt_debug_tcp,
        Opt_debug_mdsc,
        Opt_debug_osdc,
        Opt_debug_addr,
@@ -358,7 +356,6 @@ static match_table_t arg_tokens = {
        {Opt_fsidminor, "fsidminor=%ld"},
        {Opt_debug, "debug=%d"},
        {Opt_debug_msgr, "debug_msgr=%d"},
-       {Opt_debug_tcp, "debug_tcp=%d"},
        {Opt_debug_mdsc, "debug_mdsc=%d"},
        {Opt_debug_osdc, "debug_osdc=%d"},
        {Opt_debug_addr, "debug_addr=%d"},
@@ -509,9 +506,6 @@ static int parse_mount_args(int flags, char *options, const char *dev_name,
                case Opt_debug_msgr:
                        ceph_debug_msgr = intval;
                        break;
-               case Opt_debug_tcp:
-                       ceph_debug_tcp = intval;
-                       break;
                case Opt_debug_mdsc:
                        ceph_debug_mdsc = intval;
                        break;
@@ -560,27 +554,6 @@ static int parse_mount_args(int flags, char *options, const char *dev_name,
        return 0;
 }
 
-/*
- * share work queue between clients.
- */
-atomic_t ceph_num_clients = ATOMIC_INIT(0);
-
-static void get_client_counter(void)
-{
-       if (atomic_add_return(1, &ceph_num_clients) == 1) {
-               dout(10, "first client, setting up workqueues\n");
-               ceph_workqueue_init();
-       }
-}
-
-static void put_client_counter(void)
-{
-       if (atomic_dec_and_test(&ceph_num_clients)) {
-               dout(10, "last client, shutting down workqueues\n");
-               ceph_workqueue_shutdown();
-       }
-}
-
 /*
  * create a fresh client instance
  */
@@ -602,9 +575,6 @@ struct ceph_client *ceph_create_client(void)
 
        cl->msgr = 0;
 
-       /* start work queues? */
-       get_client_counter();
-
        cl->wb_wq = create_workqueue("ceph-writeback");
        if (cl->wb_wq == 0)
                goto fail;
@@ -623,7 +593,6 @@ struct ceph_client *ceph_create_client(void)
 
 fail:
        /* fixme: use type->init() eventually */
-       put_client_counter();
        return ERR_PTR(-ENOMEM);
 }
 
@@ -647,7 +616,6 @@ void ceph_destroy_client(struct ceph_client *cl)
                destroy_workqueue(cl->trunc_wq);
        if (cl->msgr)
                ceph_messenger_destroy(cl->msgr);
-       put_client_counter();
        kfree(cl);
        dout(10, "destroy_client %p done\n", cl);
 }
@@ -985,18 +953,40 @@ static int __init init_ceph(void)
        dout(1, "init_ceph\n");
 
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
+       ret = -ENOMEM;
        ceph_kobj = kobject_create_and_add("ceph", fs_kobj);
        if (!ceph_kobj)
-               return -ENOMEM;
+               goto out;
 #endif
-       ceph_proc_init();
+
+       ret = ceph_proc_init();
+       if (ret < 0)
+               goto out_kobj;
+
+       ret = ceph_msgr_init();
+       if (ret < 0)
+               goto out_proc;
 
        ret = init_inodecache();
        if (ret)
-               goto out;
+               goto out_msgr;
+
        ret = register_filesystem(&ceph_fs_type);
        if (ret)
-               destroy_inodecache();
+               goto out_icache;
+       return 0;
+
+out_icache:
+       destroy_inodecache();   
+out_msgr:
+       ceph_msgr_exit();
+out_proc:
+       ceph_proc_cleanup();
+out_kobj:
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
+       kobject_put(ceph_kobj);
+       ceph_kobj = 0;
+#endif
 out:
        return ret;
 }
@@ -1004,15 +994,14 @@ out:
 static void __exit exit_ceph(void)
 {
        dout(1, "exit_ceph\n");
-
+       unregister_filesystem(&ceph_fs_type);
+       destroy_inodecache();
+       ceph_msgr_exit();
+       ceph_proc_cleanup();
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
        kobject_put(ceph_kobj);
        ceph_kobj = 0;
 #endif
-       ceph_proc_cleanup();
-
-       unregister_filesystem(&ceph_fs_type);
-       destroy_inodecache();
 }
 
 module_init(init_ceph);
index 4d74faab9dfde90f99e399e1478c5c318d12f59f..df6452889cf5a351d611929c8a3faed9fc345261 100644 (file)
@@ -18,7 +18,6 @@ extern int ceph_debug_console;
 extern int ceph_debug;
 extern int ceph_debug_msgr;
 extern int ceph_debug_super;
-extern int ceph_debug_tcp;
 extern int ceph_debug_mdsc;
 extern int ceph_debug_osdc;
 extern int ceph_debug_addr;
@@ -522,7 +521,7 @@ static inline void ceph_init_dentry(struct dentry *dentry) {
 extern const struct export_operations ceph_export_ops;
 
 /* proc.c */
-extern void ceph_proc_init(void);
+extern int ceph_proc_init(void);
 extern void ceph_proc_cleanup(void);
 
 #endif /* _FS_CEPH_SUPER_H */