]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2118 29311d96-e01e-0410-9327-a35deaa...
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 27 Nov 2007 18:26:18 +0000 (18:26 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 27 Nov 2007 18:26:18 +0000 (18:26 +0000)
trunk/ceph/kernel/test/Makefile
trunk/ceph/kernel/test/kernclient.c
trunk/ceph/kernel/test/kernserver.c [new file with mode: 0644]
trunk/ceph/kernel/test/ktcp.c
trunk/ceph/kernel/test/ktcp.h
trunk/ceph/kernel/test/messenger_mini.c
trunk/ceph/kernel/test/userclient.c

index 9582bc5af15f8f40d5a50ad4e1873bed06814e48..f45e9d04baa4ea89a7ec069d9b9237dda4867260 100644 (file)
@@ -4,4 +4,4 @@
 
 obj-$(CONFIG_CEPHTESTS_FS) += ksocktest.o
 
-ksocktest-objs := kernclient.o messenger_mini.o ktcp.o
+ksocktest-objs := kernserver.o messenger_mini.o ktcp.o
index b5acc0aacde749ea18781798b94d1571e8868d36..568f1685e43db9dba1cecf90ffbcf7f74b47f9e2 100644 (file)
@@ -18,9 +18,6 @@ MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
 MODULE_DESCRIPTION("kernel thread test for Linux");
 MODULE_LICENSE("GPL");
 
-int work_init(void);
-void shutdown_workqueues(void);
-int add_sock_callbacks(struct socket *, struct ceph_connection *);
 int inet_aton (const char *cp, struct in_addr *addr);
 struct task_struct *thread;
 
@@ -60,7 +57,7 @@ static int sock_thread(void *unusedfornow)
 
        printk(KERN_INFO "about to connect to server\n");
        /* connect to server */
-       if ((ret = do_connect(con))) {
+       if ((ret = _kconnect(con))) {
                printk(KERN_INFO "error connecting %d\n", ret);
                goto done;
        }
diff --git a/trunk/ceph/kernel/test/kernserver.c b/trunk/ceph/kernel/test/kernserver.c
new file mode 100644 (file)
index 0000000..5c6b0b2
--- /dev/null
@@ -0,0 +1,142 @@
+#include <linux/module.h>
+#include <linux/kthread.h>
+#include <linux/socket.h>
+#include <linux/net.h>
+#include <linux/string.h>
+#include <net/tcp.h>
+
+#include <linux/ceph_fs.h>
+#include "messenger.h"
+#include "ktcp.h"
+
+
+#define PORT 9009
+#define HOST cranium
+#define cranium 192.168.1.3
+
+MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
+MODULE_DESCRIPTION("kernel thread test for Linux");
+MODULE_LICENSE("GPL");
+
+struct ceph_messenger *ceph_messenger_create(void);
+int inet_aton (const char *cp, struct in_addr *addr);
+struct task_struct *thread;
+
+
+/*
+ *  Wake up a thread when there is work to do
+ */
+/*  void thread_wake_up(void)
+{
+}
+*/
+
+/*
+ * Client connect thread
+ */
+static int listen_thread(void *unusedfornow)
+{
+       struct ceph_messenger *msgr = NULL;
+
+       printk(KERN_INFO "starting kernel listen thread\n");
+
+       msgr = ceph_messenger_create();
+       if (msgr == NULL) return 1;
+
+       printk(KERN_INFO "about to listen\n");
+
+       set_current_state(TASK_INTERRUPTIBLE);
+        /* an endless loop in which we are doing our work */
+       while (!kthread_should_stop())
+        {
+               set_current_state(TASK_INTERRUPTIBLE);
+               schedule();
+               printk(KERN_INFO "sock thread has been interrupted\n");
+       }
+
+       set_current_state(TASK_RUNNING);
+        sock_release(msgr->listen_sock);
+        kfree(msgr);
+       printk(KERN_INFO "kernel thread exiting\n");
+       return 0;
+}
+/*
+ * Client connect thread
+ */
+static int connect_thread(void *unusedfornow)
+{
+       struct ceph_messenger *msgr = NULL;
+       struct ceph_connection *con;
+       struct sockaddr_in saddr;
+       int ret;
+
+       printk(KERN_INFO "starting kernel thread\n");
+
+       con = new_connection(msgr);
+
+       /* inet_aton("192.168.1.3", &saddr.sin_addr); */
+       /* con->peer_addr.ipaddr.sin_addr = saddr.sin_addr; */
+
+        saddr.sin_family = AF_INET;
+        saddr.sin_addr.s_addr = htonl(INADDR_ANY);
+        saddr.sin_port = htons(PORT);
+       printk(KERN_INFO "saddr info %p\n", &saddr);
+       con->peer_addr.ipaddr = saddr;
+
+
+       set_bit(WRITE_PEND, &con->state);
+
+       printk(KERN_INFO "about to connect to server\n");
+       /* connect to server */
+       if ((ret = _kconnect(con))) {
+               printk(KERN_INFO "error connecting %d\n", ret);
+               goto done;
+       }
+       printk(KERN_INFO "connect succeeded\n");
+
+       set_current_state(TASK_INTERRUPTIBLE);
+        /* an endless loop in which we are doing our work */
+       while (!kthread_should_stop())
+        {
+               set_current_state(TASK_INTERRUPTIBLE);
+               schedule();
+               printk(KERN_INFO "sock thread has been interrupted\n");
+       }
+
+       set_current_state(TASK_RUNNING);
+        sock_release(con->sock);
+done:
+        kfree(con);
+       printk(KERN_INFO "kernel thread exiting\n");
+       return 0;
+}
+
+static int __init init_kst(void)
+{
+       int ret;
+
+       printk(KERN_INFO "kernel thread test init\n");
+       /* create new kernel threads */
+       ret = work_init();
+       thread = kthread_run(listen_thread, NULL, "listen-thread");
+               if (IS_ERR(thread))
+               {
+                       printk(KERN_INFO "failured to start kernel thread\n");
+                       return -ENOMEM;
+               }
+       return 0;
+}
+
+static void __exit exit_kst(void)
+{
+       printk(KERN_INFO "kernel thread test exit\n");
+       shutdown_workqueues();
+       kthread_stop(thread);
+       wake_up_process(thread);
+
+       return;
+}
+
+
+module_init(init_kst);
+module_exit(exit_kst);
index 12decbef93fcb29822989b9f5e2b70b426fea9a6..b96f506817c081f128f8ae11350341b58edbe999 100644 (file)
 #include "messenger.h"
 #include "ktcp.h"
 
+struct workqueue_struct *recv_wq;       /* receive work queue */
+struct workqueue_struct *send_wq;              /* send work queue */
 
-struct socket * _kconnect(struct sockaddr *saddr)
+/*
+ * socket callback functions 
+ */
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
 {
-       int ret;
-       struct socket *sd = NULL;
+        struct ceph_connection *con;
+        struct ceph_messenger *msgr;
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
-        if (ret < 0) {
-               printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-               return NULL;
+        printk(KERN_INFO "Entered ceph_data_ready \n");
+
+       if (sk->sk_state == TCP_LISTEN) {
+               msgr = (struct ceph_messenger *)sk->sk_user_data;
+               queue_work(recv_wq, &msgr->awork);
+       } else {
+               con = (struct ceph_connection *)sk->sk_user_data;
+               queue_work(recv_wq, &con->rwork);
        }
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
 
-       ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
-                               sizeof (struct sockaddr_in),0);
+        printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
+        if (test_bit(WRITE_PEND, &con->state)) {
+                printk(KERN_INFO "WRITE_PEND set in connection\n");
+                queue_work(send_wq, &con->swork);
+        }
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+        printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
+
+        if (sk->sk_state == TCP_ESTABLISHED) {
+                if (test_and_clear_bit(CONNECTING, &con->state) ||
+                   test_bit(ACCEPTING, &con->state))
+                        set_bit(OPEN, &con->state);
+                ceph_write_space(sk);
+        }
+}
+
+/* make a socket active by setting up the call back functions */
+int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+{
+        struct sock *sk = sock->sk;
+        sk->sk_user_data = con;
+        printk(KERN_INFO "Entered add_sock_callbacks\n");
+
+        /* Install callbacks */
+        sk->sk_data_ready = ceph_data_ready;
+        sk->sk_write_space = ceph_write_space;
+        sk->sk_state_change = ceph_state_change;
+
+        return 0;
+}
 /*
-       ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
-                               sizeof (struct sockaddr_in),O_NONBLOCK);
+ * initiate connection to a remote socket.
+ */
+int _kconnect(struct ceph_connection *con)
+{
+       int ret;
+       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
 
-       if (ret == -EINPROGRESS) {
-               printk(KERN_INFO "non-blocking connect in progress: %d\n", ret);
-               goto done;
-       }
-*/
+        set_bit(CONNECTING, &con->state);
 
+        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
         if (ret < 0) {
-               printk(KERN_INFO "kernel_connect error: %d\n", ret);
-               sock_release(sd);
-       }
+                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+                goto done;
+        }
+
+        /* setup callbacks */
+        add_sock_callbacks(con->sock, con);
+
+
+        ret = con->sock->ops->connect(con->sock, paddr,
+                                      sizeof(struct sockaddr_in), O_NONBLOCK);
+        if (ret == -EINPROGRESS) return 0;
+        if (ret < 0) {
+                /* TBD check for fatal errors, retry if not fatal.. */
+                printk(KERN_INFO "kernel_connect error: %d\n", ret);
+                sock_release(con->sock);
+                con->sock = NULL;
+        }
 done:
-       return(sd);
+        return ret;
 }
 
-struct socket * _klisten(struct sockaddr *saddr)
+/*
+ * setup listening socket
+ */
+int _klisten(struct ceph_messenger *msgr)
 {
        int ret;
-       struct socket *sd = NULL;
-       struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr;
+       struct socket *sock = NULL;
+       int optval = 1;
+       struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
 
 
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
         if (ret < 0) {
                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-               return(NULL);
+               return ret;
+       }
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&optval, sizeof(optval)); 
+       if (ret < 0) {
+               printk("Failed to set SO_REUSEADDR: %d\n", ret);
+               goto err;
        }
 
+       /* set user_data to be the messenger */
+       sock->sk->sk_user_data = msgr;
+
        /* no user specified address given so create, will allow arg to mount */
-       if (!in_addr->sin_addr.s_addr) {
-               in_addr->sin_family = AF_INET;
-               in_addr->sin_addr.s_addr = htonl(INADDR_ANY);
-               in_addr->sin_port = htons(CEPH_PORT);  /* known port for now */
-               /* in_addr->sin_port = htons(0); */  /* any port */
+       myaddr->sin_family = AF_INET;
+       myaddr->sin_addr.s_addr = htonl(INADDR_ANY);
+       myaddr->sin_port = htons(CEPH_PORT);  /* known port for now */
+       /* myaddr->sin_port = htons(0); */  /* any port */
+       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, 
+                               sizeof(struct sockaddr_in));
+       if (ret < 0) {
+               printk("Failed to bind to port %d\n", ret);
+               goto err;
        }
 
-/* TBD: set sock options... */
-       /*  ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)optval, optlen); 
+       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+                               (char *)&optval, sizeof(optval)); 
        if (ret < 0) {
-               printk("Failed to set SO_REUSEADDR: %d\n", ret);
-       }  */
-       ret = sd->ops->bind(sd, saddr, sizeof(saddr));
-/* TBD: probaby want to tune the backlog queue .. */
-       ret = sd->ops->listen(sd, NUM_BACKUP);
+               printk("Failed to set SO_KEEPALIVE: %d\n", ret);
+               goto err;
+       }
+
+       msgr->listen_sock = sock;
+
+       /* TBD: probaby want to tune the backlog queue .. */
+       ret = sock->ops->listen(sock, NUM_BACKUP);
        if (ret < 0) {
                printk(KERN_INFO "kernel_listen error: %d\n", ret);
-               sock_release(sd);
-               sd = NULL;
+               msgr->listen_sock = NULL;
+               goto err;
        }
-       return(sd);
+       return ret;
+err:
+       sock_release(sock);
+       return ret;
 }
 
 /*
- * Note: Maybe don't need this, or make inline... keep for now for debugging..
- * we may need to add more functionality
+ *  accept a connection
  */
-struct socket *_kaccept(struct socket *sd)
+int _kaccept(struct socket *sock, struct ceph_connection *con)
 {
-       struct socket *new_sd = NULL;
        int ret;
+       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+       int len;
 
-       
-       ret = kernel_accept(sd, &new_sd, sd->file->f_flags);
+
+        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
         if (ret < 0) {
-               printk(KERN_INFO "kernel_accept error: %d\n", ret);
-               return(new_sd);
+                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+                goto done;
+        }
+
+        /* setup callbacks */
+        add_sock_callbacks(con->sock, con);
+
+
+        ret = sock->ops->accept(sock, con->sock, O_NONBLOCK);
+       /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */
+        if (ret < 0) {
+               printk(KERN_INFO "accept error: %d\n", ret);
+               goto err;
+       }
+       con->sock->ops = sock->ops;
+       con->sock->type = sock->type;
+       ret = con->sock->ops->getname(con->sock, paddr, &len, 2);
+        if (ret < 0) {
+               printk(KERN_INFO "getname error: %d\n", ret);
+               goto err;
        }
-/* TBD:  shall we check name for validity?  */
-       return(new_sd);
+
+        set_bit(ACCEPTING, &con->state);
+done:
+       return ret;
+err:
+       sock_release(con->sock);
+       return ret;
 }
 
 /*
  * receive a message this may return after partial send
  */
-int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
+int _krecvmsg(struct socket *sock, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
-       struct msghdr msg = {.msg_flags = msgflags};
+       struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;           /* length read */
 
        printk(KERN_INFO "entered krevmsg\n");
        msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
 
        /* receive one kvec for now...  */
-       rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags);
+       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
         if (rlen < 0) {
                printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
         }
@@ -120,32 +229,64 @@ int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
 /*
  * Send a message this may return after partial send
  */
-int _ksendmsg(struct socket *sd, struct kvec *iov, 
-               size_t kvlen, size_t len, unsigned msgflags)
+int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len)
 {
-       struct msghdr msg = {.msg_flags = msgflags};
+       struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;
 
        printk(KERN_INFO "entered ksendmsg\n");
        msg.msg_flags |=  MSG_DONTWAIT | MSG_NOSIGNAL;
 
-       rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len);
+       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
         if (rlen < 0) {
                printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
         }
        return(rlen);
 }
 
-struct sockaddr *_kgetname(struct socket *sd)
+/*
+ *  workqueue initialization
+ */
+
+int work_init(void)
 {
-       struct sockaddr *saddr = NULL;
-       int len;
-       int ret;
+        int ret = 0;
 
-       if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr,
-                                       &len, 2) < 0)) {
-               printk(KERN_INFO "kernel getname error: %d\n", ret);
-       }
-       return(saddr);
+        printk(KERN_INFO "entered work_init\n");
+        /*
+         * Create a num CPU threads to handle receive requests
+         * note: we can create more threads if needed to even out
+         * the scheduling of multiple requests..
+         */
+        recv_wq = create_workqueue("ceph-recv");
+        ret = IS_ERR(recv_wq);
+        if (ret) {
+                printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+                destroy_workqueue(recv_wq);
+                return ret;
+        }
 
+        /*
+         * Create a single thread to handle send requests
+         * note: may use same thread pool as receive workers later...
+         */
+        send_wq = create_singlethread_workqueue("ceph-send");
+        ret = IS_ERR(send_wq);
+        if (ret) {
+                printk(KERN_INFO "send worker failed to start: %d\n", ret);
+                destroy_workqueue(send_wq);
+                return ret;
+        }
+        printk(KERN_INFO "successfully created wrkqueues\n");
+
+        return(ret);
+}
+
+/*
+ *  workqueue shutdown
+ */
+void shutdown_workqueues(void)
+{
+        destroy_workqueue(send_wq);
+        destroy_workqueue(recv_wq);
 }
index a6f6d1f0a8b57570116ee8c5731cfd9d39a357e8..841dd7bba02fd947cc366e2cc8e6285fdd310521 100644 (file)
@@ -1,12 +1,17 @@
 #ifndef _FS_CEPH_TCP_H
 #define _FS_CEPH_TCP_H
 
+extern struct workqueue_struct *recv_wq;       /* receive work queue */
+extern struct workqueue_struct *send_wq;       /* send work queue */
+
 /* prototype definitions */
-struct socket * _kconnect(struct sockaddr *);
-struct socket * _klisten(struct sockaddr *);
-struct socket *_kaccept(struct socket *);
-int _krecvmsg(struct socket *, void *, size_t , unsigned);
-int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned);
+int _kconnect(struct ceph_connection *);
+int _klisten(struct ceph_messenger *);
+int _kaccept(struct socket *, struct ceph_connection *);
+int _krecvmsg(struct socket *, void *, size_t );
+int _ksendmsg(struct socket *, struct kvec *, size_t, size_t);
+int work_init(void);
+void shutdown_workqueues(void);
 
 /* Well known port for ceph client listener.. */
 #define CEPH_PORT 2002
index 5d649d446e263dbc2dc571e72bb6ca59c58b5ae5..b57de70039759054c5c2d2182f5ba7787b3800da 100644 (file)
@@ -8,9 +8,6 @@
 #include "messenger.h"
 #include "ktcp.h"
 
-static struct workqueue_struct *recv_wq;        /* receive work queue */
-static struct workqueue_struct *send_wq;        /* send work queue */
-
 static void try_read(struct work_struct *);
 static void try_write(struct work_struct *);
 static void try_accept(struct work_struct *);
@@ -43,42 +40,6 @@ struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        return con;
 }
 
-/*
- * initiate connection to a remote socket.
- *
- */
-int do_connect(struct ceph_connection *con)
-{
-        struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-       int ret = 0;
-
-
-       set_bit(CONNECTING, &con->state);
-
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
-       if (ret < 0) {
-                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-                goto done;
-        }
-
-        /* setup callbacks */
-        add_sock_callbacks(con->sock, con);
-
-
-        ret = con->sock->ops->connect(con->sock, paddr, 
-                                     sizeof(struct sockaddr_in),O_NONBLOCK);
-       if (ret == -EINPROGRESS) return 0;
-        if (ret < 0) {
-               /* TBD check for fatal errors, retry if not fatal.. */
-                printk(KERN_INFO "kernel_connect error: %d\n", ret);
-                sock_release(con->sock);
-               con->sock = NULL;
-        }
-done:
-        printk(KERN_INFO "do_connect state = %u\n", con->state);
-        return ret;
-}
-
 /*
  * call when socket is writeable
  */
@@ -106,7 +67,7 @@ static void try_write(struct work_struct *work)
         iov.iov_base = con->buffer;
         iov.iov_len = 255;
         printk(KERN_INFO "about to send message\n");
-        ret = _ksendmsg(con->sock,&iov,1,iov.iov_len,0);
+        ret = _ksendmsg(con->sock,&iov,1,iov.iov_len);
         if (ret < 0) goto done;
         printk(KERN_INFO "wrote %d bytes to server\n", ret);
 
@@ -135,7 +96,7 @@ static void try_read(struct work_struct *work)
         if (con->buffer == NULL)
                 goto done;
 
-       len = _krecvmsg(con->sock,con->buffer,255,0);
+       len = _krecvmsg(con->sock,con->buffer,255);
        if (len < 0){
                printk(KERN_INFO "ERROR reading from socket\n");
                goto done;
@@ -148,179 +109,71 @@ done:
        return;
 }
 
-
 /*
- * Accepter thread
+ *  worker function when listener receives a connect
  */
 static void try_accept(struct work_struct *work)
 {
-       struct socket *sock, *new_sock;
-       struct sockaddr_in saddr;
         struct ceph_connection *new_con = NULL;
-       struct ceph_messenger *msgr;
-       int len;
-
-       msgr = container_of(work, struct ceph_messenger, awork);
-       sock = msgr->listen_sock;
+        struct ceph_messenger *msgr;
 
+        msgr = container_of(work, struct ceph_messenger, awork);
 
         printk(KERN_INFO "Entered try_accept\n");
 
-
-        if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) {
-               printk(KERN_INFO "error accepting connection \n");
+        /* initialize the msgr connection */
+        new_con = new_connection(msgr);
+        if (new_con == NULL) {
+                printk(KERN_INFO "malloc failure\n");
                 goto done;
         }
-        printk(KERN_INFO "accepted connection \n");
 
-        /* get the address at the other end */
-        memset(&saddr, 0, sizeof(saddr));
-        if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) {
-                printk(KERN_INFO "getname error connection aborted\n");
-                sock_release(new_sock);
+        if(_kaccept(msgr->listen_sock, new_con) < 0) {
+                printk(KERN_INFO "error accepting connection\n");
+                kfree(new_con);
                 goto done;
         }
+        printk(KERN_INFO "accepted connection \n");
 
-       /* initialize the msgr connection */
-       new_con = new_connection(msgr);
-       if (new_con == NULL) {
-                       printk(KERN_INFO "malloc failure\n");
-               sock_release(new_sock);
-               goto done;
-               }
-       new_con->sock = new_sock;
-       set_bit(ACCEPTING, &new_con->state);
-       /* new_con->in_tag = CEPH_MSGR_TAG_READY; */
-       /* fill in part of peers address */
-       new_con->peer_addr.ipaddr = saddr;
-
-       /* hand off to worker threads , send pending */
-       /*?? queue_work(send_wq, &new_con->swork);*/
+        /* prepare_write_accept_announce(msgr, new_con); */
+
+        /* add_connection_accepting(msgr, new_con); */
+
+        set_bit(WRITE_PEND, &new_con->state);
+        /*
+         * hand off to worker threads ,should be able to write, we want to
+         * try to write right away, we may have missed socket state change
+         */
+        queue_work(send_wq, &new_con->swork);
 done:
         return;
 }
 
 /*
- * create a new messenger instance, saddr is address specified from mount arg.
- * If null, will get created by _klisten()
+ * create a new messenger instance, creates a listening socket
  */
-struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr)
+struct ceph_messenger *ceph_messenger_create(void)
 {
         struct ceph_messenger *msgr;
+        int ret = 0;
 
-        msgr = kmalloc(sizeof(*msgr), GFP_KERNEL);
+        msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
         if (msgr == NULL)
-                return NULL;
-        memset(msgr, 0, sizeof(*msgr));
+                return ERR_PTR(-ENOMEM);
 
         spin_lock_init(&msgr->con_lock);
 
         /* create listening socket */
-        msgr->listen_sock = _klisten(saddr);
-        if (msgr->listen_sock == NULL) {
+        ret = _klisten(msgr);
+        if(ret < 0) {
                 kfree(msgr);
-                return NULL;
+                return  ERR_PTR(ret);
         }
-        /* TBD: setup callback for accept */
-        INIT_WORK(&msgr->awork, try_accept);       /* setup work structure */
-        return msgr;
-}
 
-/* Data available on socket or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
-{
-        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-
-        printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state);
-        if (test_bit(READ_PEND, &con->state)) {
-               printk(KERN_INFO "READ_PEND set in connection\n");
-                queue_work(recv_wq, &con->rwork);
-       }
-}
-
-static void ceph_write_space(struct sock *sk)
-{
-        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-
-        printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
-        if (test_bit(WRITE_PEND, &con->state)) {
-               printk(KERN_INFO "WRITE_PEND set in connection\n");
-                queue_work(send_wq, &con->swork);
-       }
-}
-
-static void ceph_state_change(struct sock *sk)
-{
-        struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-       /* TBD: probably want to set our connection state to OPEN
-        * if state not set to READ or WRITE pending
-        */
-        printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
-        if (sk->sk_state == TCP_ESTABLISHED) {
-               if (test_and_clear_bit(CONNECTING, &con->state))
-                       set_bit(OPEN, &con->state);
-                ceph_write_space(sk);
-       }
-}
-
-/* Make a socket active */
-int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
-{
-       struct sock *sk = sock->sk;
-       sk->sk_user_data = con;
-        printk(KERN_INFO "Entered add_sock_callbacks\n");
-
-        /* Install callbacks */
-       sk->sk_data_ready = ceph_data_ready;
-       sk->sk_write_space = ceph_write_space;
-       sk->sk_state_change = ceph_state_change;
-
-        return 0;
-}
-
-/*
- *  workqueue initialization
- */
-
-int work_init(void)
-{
-        int ret = 0;
-
-       printk(KERN_INFO "entered work_init\n");
-        /*
-         * Create a num CPU threads to handle receive requests
-         * note: we can create more threads if needed to even out
-         * the scheduling of multiple requests..
-         */
-        recv_wq = create_workqueue("ceph-recv");
-        ret = IS_ERR(recv_wq);
-        if (ret) {
-                printk(KERN_INFO "receive worker failed to start: %d\n", ret);
-                destroy_workqueue(recv_wq);
-                return ret;
-        }
-
-        /*
-         * Create a single thread to handle send requests
-         * note: may use same thread pool as receive workers later...
-         */
-        send_wq = create_singlethread_workqueue("ceph-send");
-        ret = IS_ERR(send_wq);
-        if (ret) {
-                printk(KERN_INFO "send worker failed to start: %d\n", ret);
-                destroy_workqueue(send_wq);
-                return ret;
-        }
-       printk(KERN_INFO "successfully created wrkqueues\n");
-
-        return(ret);
-}
+        INIT_WORK(&msgr->awork, try_accept);       /* setup work structure */
 
-/*
- *  workqueue shutdown
- */
-void shutdown_workqueues(void)
-{
-        destroy_workqueue(send_wq);
-        destroy_workqueue(recv_wq);
+        printk(KERN_INFO "ceph_messenger_create listening on %x:%d\n",
+               ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
+               ntohl(msgr->inst.addr.ipaddr.sin_port));
+        return msgr;
 }
index ce173fc5effa231d7bf81b163401816ee0f65b6f..2a72b8cceae9670152472ffecedc6ab09f582b73 100644 (file)
@@ -40,6 +40,15 @@ int main(int argc, char *argv[])
                fprintf(stderr,"connection error\n");
                exit(1);        
        }
+       printf("connected to kernel server\n");
+        bzero(buf,256);
+        len = read(sd,buf,255);
+        if (len < 0) {
+                fprintf(stderr,"read error\n");
+                exit(1);
+        }
+        printf("Message received: %s\n",buf);
+
        printf("Please enter the message: ");
        bzero(buf,256);
        fgets(buf,255,stdin);