From d75f3fafeedffca8d0f6028c65f8cc6b11c964b9 Mon Sep 17 00:00:00 2001 From: patiencew Date: Tue, 27 Nov 2007 18:26:18 +0000 Subject: [PATCH] git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2118 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/test/Makefile | 2 +- trunk/ceph/kernel/test/kernclient.c | 5 +- trunk/ceph/kernel/test/kernserver.c | 142 ++++++++++++ trunk/ceph/kernel/test/ktcp.c | 275 ++++++++++++++++++------ trunk/ceph/kernel/test/ktcp.h | 15 +- trunk/ceph/kernel/test/messenger_mini.c | 219 ++++--------------- trunk/ceph/kernel/test/userclient.c | 9 + 7 files changed, 407 insertions(+), 260 deletions(-) create mode 100644 trunk/ceph/kernel/test/kernserver.c diff --git a/trunk/ceph/kernel/test/Makefile b/trunk/ceph/kernel/test/Makefile index 9582bc5af15f8..f45e9d04baa4e 100644 --- a/trunk/ceph/kernel/test/Makefile +++ b/trunk/ceph/kernel/test/Makefile @@ -4,4 +4,4 @@ obj-$(CONFIG_CEPHTESTS_FS) += ksocktest.o -ksocktest-objs := kernclient.o messenger_mini.o ktcp.o +ksocktest-objs := kernserver.o messenger_mini.o ktcp.o diff --git a/trunk/ceph/kernel/test/kernclient.c b/trunk/ceph/kernel/test/kernclient.c index b5acc0aacde74..568f1685e43db 100644 --- a/trunk/ceph/kernel/test/kernclient.c +++ b/trunk/ceph/kernel/test/kernclient.c @@ -18,9 +18,6 @@ MODULE_AUTHOR("Patience Warnick "); MODULE_DESCRIPTION("kernel thread test for Linux"); MODULE_LICENSE("GPL"); -int work_init(void); -void shutdown_workqueues(void); -int add_sock_callbacks(struct socket *, struct ceph_connection *); int inet_aton (const char *cp, struct in_addr *addr); struct task_struct *thread; @@ -60,7 +57,7 @@ static int sock_thread(void *unusedfornow) printk(KERN_INFO "about to connect to server\n"); /* connect to server */ - if ((ret = do_connect(con))) { + if ((ret = _kconnect(con))) { printk(KERN_INFO "error connecting %d\n", ret); goto done; } diff --git a/trunk/ceph/kernel/test/kernserver.c b/trunk/ceph/kernel/test/kernserver.c new file mode 100644 index 0000000000000..5c6b0b299f98a --- /dev/null +++ b/trunk/ceph/kernel/test/kernserver.c @@ -0,0 +1,142 @@ +#include +#include +#include +#include +#include +#include + +#include +#include "messenger.h" +#include "ktcp.h" + + +#define PORT 9009 +#define HOST cranium +#define cranium 192.168.1.3 + +MODULE_AUTHOR("Patience Warnick "); +MODULE_DESCRIPTION("kernel thread test for Linux"); +MODULE_LICENSE("GPL"); + +struct ceph_messenger *ceph_messenger_create(void); +int inet_aton (const char *cp, struct in_addr *addr); +struct task_struct *thread; + + +/* + * Wake up a thread when there is work to do + */ +/* void thread_wake_up(void) +{ +} +*/ + +/* + * Client connect thread + */ +static int listen_thread(void *unusedfornow) +{ + struct ceph_messenger *msgr = NULL; + + printk(KERN_INFO "starting kernel listen thread\n"); + + msgr = ceph_messenger_create(); + if (msgr == NULL) return 1; + + printk(KERN_INFO "about to listen\n"); + + set_current_state(TASK_INTERRUPTIBLE); + /* an endless loop in which we are doing our work */ + while (!kthread_should_stop()) + { + set_current_state(TASK_INTERRUPTIBLE); + schedule(); + printk(KERN_INFO "sock thread has been interrupted\n"); + } + + set_current_state(TASK_RUNNING); + sock_release(msgr->listen_sock); + kfree(msgr); + printk(KERN_INFO "kernel thread exiting\n"); + return 0; +} +/* + * Client connect thread + */ +static int connect_thread(void *unusedfornow) +{ + struct ceph_messenger *msgr = NULL; + struct ceph_connection *con; + struct sockaddr_in saddr; + int ret; + + printk(KERN_INFO "starting kernel thread\n"); + + con = new_connection(msgr); + + /* inet_aton("192.168.1.3", &saddr.sin_addr); */ + /* con->peer_addr.ipaddr.sin_addr = saddr.sin_addr; */ + + saddr.sin_family = AF_INET; + saddr.sin_addr.s_addr = htonl(INADDR_ANY); + saddr.sin_port = htons(PORT); + printk(KERN_INFO "saddr info %p\n", &saddr); + con->peer_addr.ipaddr = saddr; + + + set_bit(WRITE_PEND, &con->state); + + printk(KERN_INFO "about to connect to server\n"); + /* connect to server */ + if ((ret = _kconnect(con))) { + printk(KERN_INFO "error connecting %d\n", ret); + goto done; + } + printk(KERN_INFO "connect succeeded\n"); + + set_current_state(TASK_INTERRUPTIBLE); + /* an endless loop in which we are doing our work */ + while (!kthread_should_stop()) + { + set_current_state(TASK_INTERRUPTIBLE); + schedule(); + printk(KERN_INFO "sock thread has been interrupted\n"); + } + + set_current_state(TASK_RUNNING); + sock_release(con->sock); +done: + kfree(con); + printk(KERN_INFO "kernel thread exiting\n"); + return 0; +} + +static int __init init_kst(void) +{ + int ret; + + printk(KERN_INFO "kernel thread test init\n"); + /* create new kernel threads */ + ret = work_init(); + thread = kthread_run(listen_thread, NULL, "listen-thread"); + if (IS_ERR(thread)) + { + printk(KERN_INFO "failured to start kernel thread\n"); + return -ENOMEM; + } + return 0; +} + +static void __exit exit_kst(void) +{ + printk(KERN_INFO "kernel thread test exit\n"); + shutdown_workqueues(); + kthread_stop(thread); + wake_up_process(thread); + + return; +} + + +module_init(init_kst); +module_exit(exit_kst); diff --git a/trunk/ceph/kernel/test/ktcp.c b/trunk/ceph/kernel/test/ktcp.c index 12decbef93fcb..b96f506817c08 100644 --- a/trunk/ceph/kernel/test/ktcp.c +++ b/trunk/ceph/kernel/test/ktcp.c @@ -5,109 +5,218 @@ #include "messenger.h" #include "ktcp.h" +struct workqueue_struct *recv_wq; /* receive work queue */ +struct workqueue_struct *send_wq; /* send work queue */ -struct socket * _kconnect(struct sockaddr *saddr) +/* + * socket callback functions + */ +/* Data available on socket or listen socket received a connect */ +static void ceph_data_ready(struct sock *sk, int count_unused) { - int ret; - struct socket *sd = NULL; + struct ceph_connection *con; + struct ceph_messenger *msgr; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd); - if (ret < 0) { - printk(KERN_INFO "sock_create_kern error: %d\n", ret); - return NULL; + printk(KERN_INFO "Entered ceph_data_ready \n"); + + if (sk->sk_state == TCP_LISTEN) { + msgr = (struct ceph_messenger *)sk->sk_user_data; + queue_work(recv_wq, &msgr->awork); + } else { + con = (struct ceph_connection *)sk->sk_user_data; + queue_work(recv_wq, &con->rwork); } +} + +/* socket has bufferspace for writing */ +static void ceph_write_space(struct sock *sk) +{ + struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - ret = sd->ops->connect(sd, (struct sockaddr *) saddr, - sizeof (struct sockaddr_in),0); + printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state); + if (test_bit(WRITE_PEND, &con->state)) { + printk(KERN_INFO "WRITE_PEND set in connection\n"); + queue_work(send_wq, &con->swork); + } +} + +/* sockets state has change */ +static void ceph_state_change(struct sock *sk) +{ + struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; + printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state); + + if (sk->sk_state == TCP_ESTABLISHED) { + if (test_and_clear_bit(CONNECTING, &con->state) || + test_bit(ACCEPTING, &con->state)) + set_bit(OPEN, &con->state); + ceph_write_space(sk); + } +} + +/* make a socket active by setting up the call back functions */ +int add_sock_callbacks(struct socket *sock, struct ceph_connection *con) +{ + struct sock *sk = sock->sk; + sk->sk_user_data = con; + printk(KERN_INFO "Entered add_sock_callbacks\n"); + + /* Install callbacks */ + sk->sk_data_ready = ceph_data_ready; + sk->sk_write_space = ceph_write_space; + sk->sk_state_change = ceph_state_change; + + return 0; +} /* - ret = sd->ops->connect(sd, (struct sockaddr *) saddr, - sizeof (struct sockaddr_in),O_NONBLOCK); + * initiate connection to a remote socket. + */ +int _kconnect(struct ceph_connection *con) +{ + int ret; + struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; - if (ret == -EINPROGRESS) { - printk(KERN_INFO "non-blocking connect in progress: %d\n", ret); - goto done; - } -*/ + set_bit(CONNECTING, &con->state); + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { - printk(KERN_INFO "kernel_connect error: %d\n", ret); - sock_release(sd); - } + printk(KERN_INFO "sock_create_kern error: %d\n", ret); + goto done; + } + + /* setup callbacks */ + add_sock_callbacks(con->sock, con); + + + ret = con->sock->ops->connect(con->sock, paddr, + sizeof(struct sockaddr_in), O_NONBLOCK); + if (ret == -EINPROGRESS) return 0; + if (ret < 0) { + /* TBD check for fatal errors, retry if not fatal.. */ + printk(KERN_INFO "kernel_connect error: %d\n", ret); + sock_release(con->sock); + con->sock = NULL; + } done: - return(sd); + return ret; } -struct socket * _klisten(struct sockaddr *saddr) +/* + * setup listening socket + */ +int _klisten(struct ceph_messenger *msgr) { int ret; - struct socket *sd = NULL; - struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr; + struct socket *sock = NULL; + int optval = 1; + struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr; - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd); + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) { printk(KERN_INFO "sock_create_kern error: %d\n", ret); - return(NULL); + return ret; + } + ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&optval, sizeof(optval)); + if (ret < 0) { + printk("Failed to set SO_REUSEADDR: %d\n", ret); + goto err; } + /* set user_data to be the messenger */ + sock->sk->sk_user_data = msgr; + /* no user specified address given so create, will allow arg to mount */ - if (!in_addr->sin_addr.s_addr) { - in_addr->sin_family = AF_INET; - in_addr->sin_addr.s_addr = htonl(INADDR_ANY); - in_addr->sin_port = htons(CEPH_PORT); /* known port for now */ - /* in_addr->sin_port = htons(0); */ /* any port */ + myaddr->sin_family = AF_INET; + myaddr->sin_addr.s_addr = htonl(INADDR_ANY); + myaddr->sin_port = htons(CEPH_PORT); /* known port for now */ + /* myaddr->sin_port = htons(0); */ /* any port */ + ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, + sizeof(struct sockaddr_in)); + if (ret < 0) { + printk("Failed to bind to port %d\n", ret); + goto err; } -/* TBD: set sock options... */ - /* ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, - (char *)optval, optlen); + ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&optval, sizeof(optval)); if (ret < 0) { - printk("Failed to set SO_REUSEADDR: %d\n", ret); - } */ - ret = sd->ops->bind(sd, saddr, sizeof(saddr)); -/* TBD: probaby want to tune the backlog queue .. */ - ret = sd->ops->listen(sd, NUM_BACKUP); + printk("Failed to set SO_KEEPALIVE: %d\n", ret); + goto err; + } + + msgr->listen_sock = sock; + + /* TBD: probaby want to tune the backlog queue .. */ + ret = sock->ops->listen(sock, NUM_BACKUP); if (ret < 0) { printk(KERN_INFO "kernel_listen error: %d\n", ret); - sock_release(sd); - sd = NULL; + msgr->listen_sock = NULL; + goto err; } - return(sd); + return ret; +err: + sock_release(sock); + return ret; } /* - * Note: Maybe don't need this, or make inline... keep for now for debugging.. - * we may need to add more functionality + * accept a connection */ -struct socket *_kaccept(struct socket *sd) +int _kaccept(struct socket *sock, struct ceph_connection *con) { - struct socket *new_sd = NULL; int ret; + struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; + int len; - - ret = kernel_accept(sd, &new_sd, sd->file->f_flags); + + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { - printk(KERN_INFO "kernel_accept error: %d\n", ret); - return(new_sd); + printk(KERN_INFO "sock_create_kern error: %d\n", ret); + goto done; + } + + /* setup callbacks */ + add_sock_callbacks(con->sock, con); + + + ret = sock->ops->accept(sock, con->sock, O_NONBLOCK); + /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */ + if (ret < 0) { + printk(KERN_INFO "accept error: %d\n", ret); + goto err; + } + con->sock->ops = sock->ops; + con->sock->type = sock->type; + ret = con->sock->ops->getname(con->sock, paddr, &len, 2); + if (ret < 0) { + printk(KERN_INFO "getname error: %d\n", ret); + goto err; } -/* TBD: shall we check name for validity? */ - return(new_sd); + + set_bit(ACCEPTING, &con->state); +done: + return ret; +err: + sock_release(con->sock); + return ret; } /* * receive a message this may return after partial send */ -int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags) +int _krecvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; - struct msghdr msg = {.msg_flags = msgflags}; + struct msghdr msg = {.msg_flags = 0}; int rlen = 0; /* length read */ printk(KERN_INFO "entered krevmsg\n"); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; /* receive one kvec for now... */ - rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags); + rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); if (rlen < 0) { printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen); } @@ -120,32 +229,64 @@ int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags) /* * Send a message this may return after partial send */ -int _ksendmsg(struct socket *sd, struct kvec *iov, - size_t kvlen, size_t len, unsigned msgflags) +int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len) { - struct msghdr msg = {.msg_flags = msgflags}; + struct msghdr msg = {.msg_flags = 0}; int rlen = 0; printk(KERN_INFO "entered ksendmsg\n"); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len); + rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); if (rlen < 0) { printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen); } return(rlen); } -struct sockaddr *_kgetname(struct socket *sd) +/* + * workqueue initialization + */ + +int work_init(void) { - struct sockaddr *saddr = NULL; - int len; - int ret; + int ret = 0; - if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr, - &len, 2) < 0)) { - printk(KERN_INFO "kernel getname error: %d\n", ret); - } - return(saddr); + printk(KERN_INFO "entered work_init\n"); + /* + * Create a num CPU threads to handle receive requests + * note: we can create more threads if needed to even out + * the scheduling of multiple requests.. + */ + recv_wq = create_workqueue("ceph-recv"); + ret = IS_ERR(recv_wq); + if (ret) { + printk(KERN_INFO "receive worker failed to start: %d\n", ret); + destroy_workqueue(recv_wq); + return ret; + } + /* + * Create a single thread to handle send requests + * note: may use same thread pool as receive workers later... + */ + send_wq = create_singlethread_workqueue("ceph-send"); + ret = IS_ERR(send_wq); + if (ret) { + printk(KERN_INFO "send worker failed to start: %d\n", ret); + destroy_workqueue(send_wq); + return ret; + } + printk(KERN_INFO "successfully created wrkqueues\n"); + + return(ret); +} + +/* + * workqueue shutdown + */ +void shutdown_workqueues(void) +{ + destroy_workqueue(send_wq); + destroy_workqueue(recv_wq); } diff --git a/trunk/ceph/kernel/test/ktcp.h b/trunk/ceph/kernel/test/ktcp.h index a6f6d1f0a8b57..841dd7bba02fd 100644 --- a/trunk/ceph/kernel/test/ktcp.h +++ b/trunk/ceph/kernel/test/ktcp.h @@ -1,12 +1,17 @@ #ifndef _FS_CEPH_TCP_H #define _FS_CEPH_TCP_H +extern struct workqueue_struct *recv_wq; /* receive work queue */ +extern struct workqueue_struct *send_wq; /* send work queue */ + /* prototype definitions */ -struct socket * _kconnect(struct sockaddr *); -struct socket * _klisten(struct sockaddr *); -struct socket *_kaccept(struct socket *); -int _krecvmsg(struct socket *, void *, size_t , unsigned); -int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned); +int _kconnect(struct ceph_connection *); +int _klisten(struct ceph_messenger *); +int _kaccept(struct socket *, struct ceph_connection *); +int _krecvmsg(struct socket *, void *, size_t ); +int _ksendmsg(struct socket *, struct kvec *, size_t, size_t); +int work_init(void); +void shutdown_workqueues(void); /* Well known port for ceph client listener.. */ #define CEPH_PORT 2002 diff --git a/trunk/ceph/kernel/test/messenger_mini.c b/trunk/ceph/kernel/test/messenger_mini.c index 5d649d446e263..b57de70039759 100644 --- a/trunk/ceph/kernel/test/messenger_mini.c +++ b/trunk/ceph/kernel/test/messenger_mini.c @@ -8,9 +8,6 @@ #include "messenger.h" #include "ktcp.h" -static struct workqueue_struct *recv_wq; /* receive work queue */ -static struct workqueue_struct *send_wq; /* send work queue */ - static void try_read(struct work_struct *); static void try_write(struct work_struct *); static void try_accept(struct work_struct *); @@ -43,42 +40,6 @@ struct ceph_connection *new_connection(struct ceph_messenger *msgr) return con; } -/* - * initiate connection to a remote socket. - * - */ -int do_connect(struct ceph_connection *con) -{ - struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; - int ret = 0; - - - set_bit(CONNECTING, &con->state); - - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); - if (ret < 0) { - printk(KERN_INFO "sock_create_kern error: %d\n", ret); - goto done; - } - - /* setup callbacks */ - add_sock_callbacks(con->sock, con); - - - ret = con->sock->ops->connect(con->sock, paddr, - sizeof(struct sockaddr_in),O_NONBLOCK); - if (ret == -EINPROGRESS) return 0; - if (ret < 0) { - /* TBD check for fatal errors, retry if not fatal.. */ - printk(KERN_INFO "kernel_connect error: %d\n", ret); - sock_release(con->sock); - con->sock = NULL; - } -done: - printk(KERN_INFO "do_connect state = %u\n", con->state); - return ret; -} - /* * call when socket is writeable */ @@ -106,7 +67,7 @@ static void try_write(struct work_struct *work) iov.iov_base = con->buffer; iov.iov_len = 255; printk(KERN_INFO "about to send message\n"); - ret = _ksendmsg(con->sock,&iov,1,iov.iov_len,0); + ret = _ksendmsg(con->sock,&iov,1,iov.iov_len); if (ret < 0) goto done; printk(KERN_INFO "wrote %d bytes to server\n", ret); @@ -135,7 +96,7 @@ static void try_read(struct work_struct *work) if (con->buffer == NULL) goto done; - len = _krecvmsg(con->sock,con->buffer,255,0); + len = _krecvmsg(con->sock,con->buffer,255); if (len < 0){ printk(KERN_INFO "ERROR reading from socket\n"); goto done; @@ -148,179 +109,71 @@ done: return; } - /* - * Accepter thread + * worker function when listener receives a connect */ static void try_accept(struct work_struct *work) { - struct socket *sock, *new_sock; - struct sockaddr_in saddr; struct ceph_connection *new_con = NULL; - struct ceph_messenger *msgr; - int len; - - msgr = container_of(work, struct ceph_messenger, awork); - sock = msgr->listen_sock; + struct ceph_messenger *msgr; + msgr = container_of(work, struct ceph_messenger, awork); printk(KERN_INFO "Entered try_accept\n"); - - if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) { - printk(KERN_INFO "error accepting connection \n"); + /* initialize the msgr connection */ + new_con = new_connection(msgr); + if (new_con == NULL) { + printk(KERN_INFO "malloc failure\n"); goto done; } - printk(KERN_INFO "accepted connection \n"); - /* get the address at the other end */ - memset(&saddr, 0, sizeof(saddr)); - if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) { - printk(KERN_INFO "getname error connection aborted\n"); - sock_release(new_sock); + if(_kaccept(msgr->listen_sock, new_con) < 0) { + printk(KERN_INFO "error accepting connection\n"); + kfree(new_con); goto done; } + printk(KERN_INFO "accepted connection \n"); - /* initialize the msgr connection */ - new_con = new_connection(msgr); - if (new_con == NULL) { - printk(KERN_INFO "malloc failure\n"); - sock_release(new_sock); - goto done; - } - new_con->sock = new_sock; - set_bit(ACCEPTING, &new_con->state); - /* new_con->in_tag = CEPH_MSGR_TAG_READY; */ - /* fill in part of peers address */ - new_con->peer_addr.ipaddr = saddr; - - /* hand off to worker threads , send pending */ - /*?? queue_work(send_wq, &new_con->swork);*/ + /* prepare_write_accept_announce(msgr, new_con); */ + + /* add_connection_accepting(msgr, new_con); */ + + set_bit(WRITE_PEND, &new_con->state); + /* + * hand off to worker threads ,should be able to write, we want to + * try to write right away, we may have missed socket state change + */ + queue_work(send_wq, &new_con->swork); done: return; } /* - * create a new messenger instance, saddr is address specified from mount arg. - * If null, will get created by _klisten() + * create a new messenger instance, creates a listening socket */ -struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr) +struct ceph_messenger *ceph_messenger_create(void) { struct ceph_messenger *msgr; + int ret = 0; - msgr = kmalloc(sizeof(*msgr), GFP_KERNEL); + msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) - return NULL; - memset(msgr, 0, sizeof(*msgr)); + return ERR_PTR(-ENOMEM); spin_lock_init(&msgr->con_lock); /* create listening socket */ - msgr->listen_sock = _klisten(saddr); - if (msgr->listen_sock == NULL) { + ret = _klisten(msgr); + if(ret < 0) { kfree(msgr); - return NULL; + return ERR_PTR(ret); } - /* TBD: setup callback for accept */ - INIT_WORK(&msgr->awork, try_accept); /* setup work structure */ - return msgr; -} -/* Data available on socket or listen socket received a connect */ -static void ceph_data_ready(struct sock *sk, int count_unused) -{ - struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - - printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state); - if (test_bit(READ_PEND, &con->state)) { - printk(KERN_INFO "READ_PEND set in connection\n"); - queue_work(recv_wq, &con->rwork); - } -} - -static void ceph_write_space(struct sock *sk) -{ - struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - - printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state); - if (test_bit(WRITE_PEND, &con->state)) { - printk(KERN_INFO "WRITE_PEND set in connection\n"); - queue_work(send_wq, &con->swork); - } -} - -static void ceph_state_change(struct sock *sk) -{ - struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - /* TBD: probably want to set our connection state to OPEN - * if state not set to READ or WRITE pending - */ - printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state); - if (sk->sk_state == TCP_ESTABLISHED) { - if (test_and_clear_bit(CONNECTING, &con->state)) - set_bit(OPEN, &con->state); - ceph_write_space(sk); - } -} - -/* Make a socket active */ -int add_sock_callbacks(struct socket *sock, struct ceph_connection *con) -{ - struct sock *sk = sock->sk; - sk->sk_user_data = con; - printk(KERN_INFO "Entered add_sock_callbacks\n"); - - /* Install callbacks */ - sk->sk_data_ready = ceph_data_ready; - sk->sk_write_space = ceph_write_space; - sk->sk_state_change = ceph_state_change; - - return 0; -} - -/* - * workqueue initialization - */ - -int work_init(void) -{ - int ret = 0; - - printk(KERN_INFO "entered work_init\n"); - /* - * Create a num CPU threads to handle receive requests - * note: we can create more threads if needed to even out - * the scheduling of multiple requests.. - */ - recv_wq = create_workqueue("ceph-recv"); - ret = IS_ERR(recv_wq); - if (ret) { - printk(KERN_INFO "receive worker failed to start: %d\n", ret); - destroy_workqueue(recv_wq); - return ret; - } - - /* - * Create a single thread to handle send requests - * note: may use same thread pool as receive workers later... - */ - send_wq = create_singlethread_workqueue("ceph-send"); - ret = IS_ERR(send_wq); - if (ret) { - printk(KERN_INFO "send worker failed to start: %d\n", ret); - destroy_workqueue(send_wq); - return ret; - } - printk(KERN_INFO "successfully created wrkqueues\n"); - - return(ret); -} + INIT_WORK(&msgr->awork, try_accept); /* setup work structure */ -/* - * workqueue shutdown - */ -void shutdown_workqueues(void) -{ - destroy_workqueue(send_wq); - destroy_workqueue(recv_wq); + printk(KERN_INFO "ceph_messenger_create listening on %x:%d\n", + ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), + ntohl(msgr->inst.addr.ipaddr.sin_port)); + return msgr; } diff --git a/trunk/ceph/kernel/test/userclient.c b/trunk/ceph/kernel/test/userclient.c index ce173fc5effa2..2a72b8cceae96 100644 --- a/trunk/ceph/kernel/test/userclient.c +++ b/trunk/ceph/kernel/test/userclient.c @@ -40,6 +40,15 @@ int main(int argc, char *argv[]) fprintf(stderr,"connection error\n"); exit(1); } + printf("connected to kernel server\n"); + bzero(buf,256); + len = read(sd,buf,255); + if (len < 0) { + fprintf(stderr,"read error\n"); + exit(1); + } + printf("Message received: %s\n",buf); + printf("Please enter the message: "); bzero(buf,256); fgets(buf,255,stdin); -- 2.39.5