obj-$(CONFIG_CEPHTESTS_FS) += ksocktest.o
-ksocktest-objs := kernclient.o messenger_mini.o ktcp.o
+ksocktest-objs := kernserver.o messenger_mini.o ktcp.o
MODULE_DESCRIPTION("kernel thread test for Linux");
MODULE_LICENSE("GPL");
-int work_init(void);
-void shutdown_workqueues(void);
-int add_sock_callbacks(struct socket *, struct ceph_connection *);
int inet_aton (const char *cp, struct in_addr *addr);
struct task_struct *thread;
printk(KERN_INFO "about to connect to server\n");
/* connect to server */
- if ((ret = do_connect(con))) {
+ if ((ret = _kconnect(con))) {
printk(KERN_INFO "error connecting %d\n", ret);
goto done;
}
--- /dev/null
+#include <linux/module.h>
+#include <linux/kthread.h>
+#include <linux/socket.h>
+#include <linux/net.h>
+#include <linux/string.h>
+#include <net/tcp.h>
+
+#include <linux/ceph_fs.h>
+#include "messenger.h"
+#include "ktcp.h"
+
+
+#define PORT 9009
+#define HOST cranium
+#define cranium 192.168.1.3
+
+MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
+MODULE_DESCRIPTION("kernel thread test for Linux");
+MODULE_LICENSE("GPL");
+
+struct ceph_messenger *ceph_messenger_create(void);
+int inet_aton (const char *cp, struct in_addr *addr);
+struct task_struct *thread;
+
+
+/*
+ * Wake up a thread when there is work to do
+ */
+/* void thread_wake_up(void)
+{
+}
+*/
+
+/*
+ * Client connect thread
+ */
+static int listen_thread(void *unusedfornow)
+{
+ struct ceph_messenger *msgr = NULL;
+
+ printk(KERN_INFO "starting kernel listen thread\n");
+
+ msgr = ceph_messenger_create();
+ if (msgr == NULL) return 1;
+
+ printk(KERN_INFO "about to listen\n");
+
+ set_current_state(TASK_INTERRUPTIBLE);
+ /* an endless loop in which we are doing our work */
+ while (!kthread_should_stop())
+ {
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule();
+ printk(KERN_INFO "sock thread has been interrupted\n");
+ }
+
+ set_current_state(TASK_RUNNING);
+ sock_release(msgr->listen_sock);
+ kfree(msgr);
+ printk(KERN_INFO "kernel thread exiting\n");
+ return 0;
+}
+/*
+ * Client connect thread
+ */
+static int connect_thread(void *unusedfornow)
+{
+ struct ceph_messenger *msgr = NULL;
+ struct ceph_connection *con;
+ struct sockaddr_in saddr;
+ int ret;
+
+ printk(KERN_INFO "starting kernel thread\n");
+
+ con = new_connection(msgr);
+
+ /* inet_aton("192.168.1.3", &saddr.sin_addr); */
+ /* con->peer_addr.ipaddr.sin_addr = saddr.sin_addr; */
+
+ saddr.sin_family = AF_INET;
+ saddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ saddr.sin_port = htons(PORT);
+ printk(KERN_INFO "saddr info %p\n", &saddr);
+ con->peer_addr.ipaddr = saddr;
+
+
+ set_bit(WRITE_PEND, &con->state);
+
+ printk(KERN_INFO "about to connect to server\n");
+ /* connect to server */
+ if ((ret = _kconnect(con))) {
+ printk(KERN_INFO "error connecting %d\n", ret);
+ goto done;
+ }
+ printk(KERN_INFO "connect succeeded\n");
+
+ set_current_state(TASK_INTERRUPTIBLE);
+ /* an endless loop in which we are doing our work */
+ while (!kthread_should_stop())
+ {
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule();
+ printk(KERN_INFO "sock thread has been interrupted\n");
+ }
+
+ set_current_state(TASK_RUNNING);
+ sock_release(con->sock);
+done:
+ kfree(con);
+ printk(KERN_INFO "kernel thread exiting\n");
+ return 0;
+}
+
+static int __init init_kst(void)
+{
+ int ret;
+
+ printk(KERN_INFO "kernel thread test init\n");
+ /* create new kernel threads */
+ ret = work_init();
+ thread = kthread_run(listen_thread, NULL, "listen-thread");
+ if (IS_ERR(thread))
+ {
+ printk(KERN_INFO "failured to start kernel thread\n");
+ return -ENOMEM;
+ }
+ return 0;
+}
+
+static void __exit exit_kst(void)
+{
+ printk(KERN_INFO "kernel thread test exit\n");
+ shutdown_workqueues();
+ kthread_stop(thread);
+ wake_up_process(thread);
+
+ return;
+}
+
+
+module_init(init_kst);
+module_exit(exit_kst);
#include "messenger.h"
#include "ktcp.h"
+struct workqueue_struct *recv_wq; /* receive work queue */
+struct workqueue_struct *send_wq; /* send work queue */
-struct socket * _kconnect(struct sockaddr *saddr)
+/*
+ * socket callback functions
+ */
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
{
- int ret;
- struct socket *sd = NULL;
+ struct ceph_connection *con;
+ struct ceph_messenger *msgr;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
- if (ret < 0) {
- printk(KERN_INFO "sock_create_kern error: %d\n", ret);
- return NULL;
+ printk(KERN_INFO "Entered ceph_data_ready \n");
+
+ if (sk->sk_state == TCP_LISTEN) {
+ msgr = (struct ceph_messenger *)sk->sk_user_data;
+ queue_work(recv_wq, &msgr->awork);
+ } else {
+ con = (struct ceph_connection *)sk->sk_user_data;
+ queue_work(recv_wq, &con->rwork);
}
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+ struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
- ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
- sizeof (struct sockaddr_in),0);
+ printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
+ if (test_bit(WRITE_PEND, &con->state)) {
+ printk(KERN_INFO "WRITE_PEND set in connection\n");
+ queue_work(send_wq, &con->swork);
+ }
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+ struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
+ printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
+
+ if (sk->sk_state == TCP_ESTABLISHED) {
+ if (test_and_clear_bit(CONNECTING, &con->state) ||
+ test_bit(ACCEPTING, &con->state))
+ set_bit(OPEN, &con->state);
+ ceph_write_space(sk);
+ }
+}
+
+/* make a socket active by setting up the call back functions */
+int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
+{
+ struct sock *sk = sock->sk;
+ sk->sk_user_data = con;
+ printk(KERN_INFO "Entered add_sock_callbacks\n");
+
+ /* Install callbacks */
+ sk->sk_data_ready = ceph_data_ready;
+ sk->sk_write_space = ceph_write_space;
+ sk->sk_state_change = ceph_state_change;
+
+ return 0;
+}
/*
- ret = sd->ops->connect(sd, (struct sockaddr *) saddr,
- sizeof (struct sockaddr_in),O_NONBLOCK);
+ * initiate connection to a remote socket.
+ */
+int _kconnect(struct ceph_connection *con)
+{
+ int ret;
+ struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- if (ret == -EINPROGRESS) {
- printk(KERN_INFO "non-blocking connect in progress: %d\n", ret);
- goto done;
- }
-*/
+ set_bit(CONNECTING, &con->state);
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
if (ret < 0) {
- printk(KERN_INFO "kernel_connect error: %d\n", ret);
- sock_release(sd);
- }
+ printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+ goto done;
+ }
+
+ /* setup callbacks */
+ add_sock_callbacks(con->sock, con);
+
+
+ ret = con->sock->ops->connect(con->sock, paddr,
+ sizeof(struct sockaddr_in), O_NONBLOCK);
+ if (ret == -EINPROGRESS) return 0;
+ if (ret < 0) {
+ /* TBD check for fatal errors, retry if not fatal.. */
+ printk(KERN_INFO "kernel_connect error: %d\n", ret);
+ sock_release(con->sock);
+ con->sock = NULL;
+ }
done:
- return(sd);
+ return ret;
}
-struct socket * _klisten(struct sockaddr *saddr)
+/*
+ * setup listening socket
+ */
+int _klisten(struct ceph_messenger *msgr)
{
int ret;
- struct socket *sd = NULL;
- struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr;
+ struct socket *sock = NULL;
+ int optval = 1;
+ struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret < 0) {
printk(KERN_INFO "sock_create_kern error: %d\n", ret);
- return(NULL);
+ return ret;
+ }
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&optval, sizeof(optval));
+ if (ret < 0) {
+ printk("Failed to set SO_REUSEADDR: %d\n", ret);
+ goto err;
}
+ /* set user_data to be the messenger */
+ sock->sk->sk_user_data = msgr;
+
/* no user specified address given so create, will allow arg to mount */
- if (!in_addr->sin_addr.s_addr) {
- in_addr->sin_family = AF_INET;
- in_addr->sin_addr.s_addr = htonl(INADDR_ANY);
- in_addr->sin_port = htons(CEPH_PORT); /* known port for now */
- /* in_addr->sin_port = htons(0); */ /* any port */
+ myaddr->sin_family = AF_INET;
+ myaddr->sin_addr.s_addr = htonl(INADDR_ANY);
+ myaddr->sin_port = htons(CEPH_PORT); /* known port for now */
+ /* myaddr->sin_port = htons(0); */ /* any port */
+ ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
+ sizeof(struct sockaddr_in));
+ if (ret < 0) {
+ printk("Failed to bind to port %d\n", ret);
+ goto err;
}
-/* TBD: set sock options... */
- /* ret = kernel_setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
- (char *)optval, optlen);
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ (char *)&optval, sizeof(optval));
if (ret < 0) {
- printk("Failed to set SO_REUSEADDR: %d\n", ret);
- } */
- ret = sd->ops->bind(sd, saddr, sizeof(saddr));
-/* TBD: probaby want to tune the backlog queue .. */
- ret = sd->ops->listen(sd, NUM_BACKUP);
+ printk("Failed to set SO_KEEPALIVE: %d\n", ret);
+ goto err;
+ }
+
+ msgr->listen_sock = sock;
+
+ /* TBD: probaby want to tune the backlog queue .. */
+ ret = sock->ops->listen(sock, NUM_BACKUP);
if (ret < 0) {
printk(KERN_INFO "kernel_listen error: %d\n", ret);
- sock_release(sd);
- sd = NULL;
+ msgr->listen_sock = NULL;
+ goto err;
}
- return(sd);
+ return ret;
+err:
+ sock_release(sock);
+ return ret;
}
/*
- * Note: Maybe don't need this, or make inline... keep for now for debugging..
- * we may need to add more functionality
+ * accept a connection
*/
-struct socket *_kaccept(struct socket *sd)
+int _kaccept(struct socket *sock, struct ceph_connection *con)
{
- struct socket *new_sd = NULL;
int ret;
+ struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+ int len;
-
- ret = kernel_accept(sd, &new_sd, sd->file->f_flags);
+
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
if (ret < 0) {
- printk(KERN_INFO "kernel_accept error: %d\n", ret);
- return(new_sd);
+ printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+ goto done;
+ }
+
+ /* setup callbacks */
+ add_sock_callbacks(con->sock, con);
+
+
+ ret = sock->ops->accept(sock, con->sock, O_NONBLOCK);
+ /* ret = kernel_accept(sock, &new_sock, sock->file->f_flags); */
+ if (ret < 0) {
+ printk(KERN_INFO "accept error: %d\n", ret);
+ goto err;
+ }
+ con->sock->ops = sock->ops;
+ con->sock->type = sock->type;
+ ret = con->sock->ops->getname(con->sock, paddr, &len, 2);
+ if (ret < 0) {
+ printk(KERN_INFO "getname error: %d\n", ret);
+ goto err;
}
-/* TBD: shall we check name for validity? */
- return(new_sd);
+
+ set_bit(ACCEPTING, &con->state);
+done:
+ return ret;
+err:
+ sock_release(con->sock);
+ return ret;
}
/*
* receive a message this may return after partial send
*/
-int _krecvmsg(struct socket *sd, void *buf, size_t len, unsigned msgflags)
+int _krecvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
- struct msghdr msg = {.msg_flags = msgflags};
+ struct msghdr msg = {.msg_flags = 0};
int rlen = 0; /* length read */
printk(KERN_INFO "entered krevmsg\n");
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
/* receive one kvec for now... */
- rlen = kernel_recvmsg(sd, &msg, &iov, 1, len, msg.msg_flags);
+ rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
if (rlen < 0) {
printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
}
/*
* Send a message this may return after partial send
*/
-int _ksendmsg(struct socket *sd, struct kvec *iov,
- size_t kvlen, size_t len, unsigned msgflags)
+int _ksendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len)
{
- struct msghdr msg = {.msg_flags = msgflags};
+ struct msghdr msg = {.msg_flags = 0};
int rlen = 0;
printk(KERN_INFO "entered ksendmsg\n");
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- rlen = kernel_sendmsg(sd, &msg, iov, kvlen, len);
+ rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
if (rlen < 0) {
printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
}
return(rlen);
}
-struct sockaddr *_kgetname(struct socket *sd)
+/*
+ * workqueue initialization
+ */
+
+int work_init(void)
{
- struct sockaddr *saddr = NULL;
- int len;
- int ret;
+ int ret = 0;
- if ((ret = sd->ops->getname(sd, (struct sockaddr *)saddr,
- &len, 2) < 0)) {
- printk(KERN_INFO "kernel getname error: %d\n", ret);
- }
- return(saddr);
+ printk(KERN_INFO "entered work_init\n");
+ /*
+ * Create a num CPU threads to handle receive requests
+ * note: we can create more threads if needed to even out
+ * the scheduling of multiple requests..
+ */
+ recv_wq = create_workqueue("ceph-recv");
+ ret = IS_ERR(recv_wq);
+ if (ret) {
+ printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+ destroy_workqueue(recv_wq);
+ return ret;
+ }
+ /*
+ * Create a single thread to handle send requests
+ * note: may use same thread pool as receive workers later...
+ */
+ send_wq = create_singlethread_workqueue("ceph-send");
+ ret = IS_ERR(send_wq);
+ if (ret) {
+ printk(KERN_INFO "send worker failed to start: %d\n", ret);
+ destroy_workqueue(send_wq);
+ return ret;
+ }
+ printk(KERN_INFO "successfully created wrkqueues\n");
+
+ return(ret);
+}
+
+/*
+ * workqueue shutdown
+ */
+void shutdown_workqueues(void)
+{
+ destroy_workqueue(send_wq);
+ destroy_workqueue(recv_wq);
}
#ifndef _FS_CEPH_TCP_H
#define _FS_CEPH_TCP_H
+extern struct workqueue_struct *recv_wq; /* receive work queue */
+extern struct workqueue_struct *send_wq; /* send work queue */
+
/* prototype definitions */
-struct socket * _kconnect(struct sockaddr *);
-struct socket * _klisten(struct sockaddr *);
-struct socket *_kaccept(struct socket *);
-int _krecvmsg(struct socket *, void *, size_t , unsigned);
-int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned);
+int _kconnect(struct ceph_connection *);
+int _klisten(struct ceph_messenger *);
+int _kaccept(struct socket *, struct ceph_connection *);
+int _krecvmsg(struct socket *, void *, size_t );
+int _ksendmsg(struct socket *, struct kvec *, size_t, size_t);
+int work_init(void);
+void shutdown_workqueues(void);
/* Well known port for ceph client listener.. */
#define CEPH_PORT 2002
#include "messenger.h"
#include "ktcp.h"
-static struct workqueue_struct *recv_wq; /* receive work queue */
-static struct workqueue_struct *send_wq; /* send work queue */
-
static void try_read(struct work_struct *);
static void try_write(struct work_struct *);
static void try_accept(struct work_struct *);
return con;
}
-/*
- * initiate connection to a remote socket.
- *
- */
-int do_connect(struct ceph_connection *con)
-{
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- int ret = 0;
-
-
- set_bit(CONNECTING, &con->state);
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
- if (ret < 0) {
- printk(KERN_INFO "sock_create_kern error: %d\n", ret);
- goto done;
- }
-
- /* setup callbacks */
- add_sock_callbacks(con->sock, con);
-
-
- ret = con->sock->ops->connect(con->sock, paddr,
- sizeof(struct sockaddr_in),O_NONBLOCK);
- if (ret == -EINPROGRESS) return 0;
- if (ret < 0) {
- /* TBD check for fatal errors, retry if not fatal.. */
- printk(KERN_INFO "kernel_connect error: %d\n", ret);
- sock_release(con->sock);
- con->sock = NULL;
- }
-done:
- printk(KERN_INFO "do_connect state = %u\n", con->state);
- return ret;
-}
-
/*
* call when socket is writeable
*/
iov.iov_base = con->buffer;
iov.iov_len = 255;
printk(KERN_INFO "about to send message\n");
- ret = _ksendmsg(con->sock,&iov,1,iov.iov_len,0);
+ ret = _ksendmsg(con->sock,&iov,1,iov.iov_len);
if (ret < 0) goto done;
printk(KERN_INFO "wrote %d bytes to server\n", ret);
if (con->buffer == NULL)
goto done;
- len = _krecvmsg(con->sock,con->buffer,255,0);
+ len = _krecvmsg(con->sock,con->buffer,255);
if (len < 0){
printk(KERN_INFO "ERROR reading from socket\n");
goto done;
return;
}
-
/*
- * Accepter thread
+ * worker function when listener receives a connect
*/
static void try_accept(struct work_struct *work)
{
- struct socket *sock, *new_sock;
- struct sockaddr_in saddr;
struct ceph_connection *new_con = NULL;
- struct ceph_messenger *msgr;
- int len;
-
- msgr = container_of(work, struct ceph_messenger, awork);
- sock = msgr->listen_sock;
+ struct ceph_messenger *msgr;
+ msgr = container_of(work, struct ceph_messenger, awork);
printk(KERN_INFO "Entered try_accept\n");
-
- if (kernel_accept(sock, &new_sock, sock->file->f_flags) < 0) {
- printk(KERN_INFO "error accepting connection \n");
+ /* initialize the msgr connection */
+ new_con = new_connection(msgr);
+ if (new_con == NULL) {
+ printk(KERN_INFO "malloc failure\n");
goto done;
}
- printk(KERN_INFO "accepted connection \n");
- /* get the address at the other end */
- memset(&saddr, 0, sizeof(saddr));
- if (new_sock->ops->getname(new_sock, (struct sockaddr *)&saddr, &len, 2)) {
- printk(KERN_INFO "getname error connection aborted\n");
- sock_release(new_sock);
+ if(_kaccept(msgr->listen_sock, new_con) < 0) {
+ printk(KERN_INFO "error accepting connection\n");
+ kfree(new_con);
goto done;
}
+ printk(KERN_INFO "accepted connection \n");
- /* initialize the msgr connection */
- new_con = new_connection(msgr);
- if (new_con == NULL) {
- printk(KERN_INFO "malloc failure\n");
- sock_release(new_sock);
- goto done;
- }
- new_con->sock = new_sock;
- set_bit(ACCEPTING, &new_con->state);
- /* new_con->in_tag = CEPH_MSGR_TAG_READY; */
- /* fill in part of peers address */
- new_con->peer_addr.ipaddr = saddr;
-
- /* hand off to worker threads , send pending */
- /*?? queue_work(send_wq, &new_con->swork);*/
+ /* prepare_write_accept_announce(msgr, new_con); */
+
+ /* add_connection_accepting(msgr, new_con); */
+
+ set_bit(WRITE_PEND, &new_con->state);
+ /*
+ * hand off to worker threads ,should be able to write, we want to
+ * try to write right away, we may have missed socket state change
+ */
+ queue_work(send_wq, &new_con->swork);
done:
return;
}
/*
- * create a new messenger instance, saddr is address specified from mount arg.
- * If null, will get created by _klisten()
+ * create a new messenger instance, creates a listening socket
*/
-struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr)
+struct ceph_messenger *ceph_messenger_create(void)
{
struct ceph_messenger *msgr;
+ int ret = 0;
- msgr = kmalloc(sizeof(*msgr), GFP_KERNEL);
+ msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
if (msgr == NULL)
- return NULL;
- memset(msgr, 0, sizeof(*msgr));
+ return ERR_PTR(-ENOMEM);
spin_lock_init(&msgr->con_lock);
/* create listening socket */
- msgr->listen_sock = _klisten(saddr);
- if (msgr->listen_sock == NULL) {
+ ret = _klisten(msgr);
+ if(ret < 0) {
kfree(msgr);
- return NULL;
+ return ERR_PTR(ret);
}
- /* TBD: setup callback for accept */
- INIT_WORK(&msgr->awork, try_accept); /* setup work structure */
- return msgr;
-}
-/* Data available on socket or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
-{
- struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-
- printk(KERN_INFO "Entered ceph_data_ready state = %u\n", con->state);
- if (test_bit(READ_PEND, &con->state)) {
- printk(KERN_INFO "READ_PEND set in connection\n");
- queue_work(recv_wq, &con->rwork);
- }
-}
-
-static void ceph_write_space(struct sock *sk)
-{
- struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
-
- printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
- if (test_bit(WRITE_PEND, &con->state)) {
- printk(KERN_INFO "WRITE_PEND set in connection\n");
- queue_work(send_wq, &con->swork);
- }
-}
-
-static void ceph_state_change(struct sock *sk)
-{
- struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
- /* TBD: probably want to set our connection state to OPEN
- * if state not set to READ or WRITE pending
- */
- printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
- if (sk->sk_state == TCP_ESTABLISHED) {
- if (test_and_clear_bit(CONNECTING, &con->state))
- set_bit(OPEN, &con->state);
- ceph_write_space(sk);
- }
-}
-
-/* Make a socket active */
-int add_sock_callbacks(struct socket *sock, struct ceph_connection *con)
-{
- struct sock *sk = sock->sk;
- sk->sk_user_data = con;
- printk(KERN_INFO "Entered add_sock_callbacks\n");
-
- /* Install callbacks */
- sk->sk_data_ready = ceph_data_ready;
- sk->sk_write_space = ceph_write_space;
- sk->sk_state_change = ceph_state_change;
-
- return 0;
-}
-
-/*
- * workqueue initialization
- */
-
-int work_init(void)
-{
- int ret = 0;
-
- printk(KERN_INFO "entered work_init\n");
- /*
- * Create a num CPU threads to handle receive requests
- * note: we can create more threads if needed to even out
- * the scheduling of multiple requests..
- */
- recv_wq = create_workqueue("ceph-recv");
- ret = IS_ERR(recv_wq);
- if (ret) {
- printk(KERN_INFO "receive worker failed to start: %d\n", ret);
- destroy_workqueue(recv_wq);
- return ret;
- }
-
- /*
- * Create a single thread to handle send requests
- * note: may use same thread pool as receive workers later...
- */
- send_wq = create_singlethread_workqueue("ceph-send");
- ret = IS_ERR(send_wq);
- if (ret) {
- printk(KERN_INFO "send worker failed to start: %d\n", ret);
- destroy_workqueue(send_wq);
- return ret;
- }
- printk(KERN_INFO "successfully created wrkqueues\n");
-
- return(ret);
-}
+ INIT_WORK(&msgr->awork, try_accept); /* setup work structure */
-/*
- * workqueue shutdown
- */
-void shutdown_workqueues(void)
-{
- destroy_workqueue(send_wq);
- destroy_workqueue(recv_wq);
+ printk(KERN_INFO "ceph_messenger_create listening on %x:%d\n",
+ ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
+ ntohl(msgr->inst.addr.ipaddr.sin_port));
+ return msgr;
}
fprintf(stderr,"connection error\n");
exit(1);
}
+ printf("connected to kernel server\n");
+ bzero(buf,256);
+ len = read(sd,buf,255);
+ if (len < 0) {
+ fprintf(stderr,"read error\n");
+ exit(1);
+ }
+ printf("Message received: %s\n",buf);
+
printf("Please enter the message: ");
bzero(buf,256);
fgets(buf,255,stdin);