]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2009 29311d96-e01e-0410-9327-a35deaa...
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 31 Oct 2007 17:35:57 +0000 (17:35 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 31 Oct 2007 17:35:57 +0000 (17:35 +0000)
trunk/ceph/kernel/messenger.c

index b980f10ff0032a6208df1679a2b151f48e9bee6d..972fb9b80ef7856f58e183b72da2af4f66b36660 100644 (file)
+#include <linux/kthread.h>
 #include <linux/socket.h>
 #include <linux/net.h>
-#include <net/tcp.h>
+#include <linux/ceph_fs.h>
 #include <linux/string.h>
+#include <net/tcp.h>
 #include "kmsg.h"
+#include "ktcp.h"
+
+static struct workqueue_struct *recv_wq;        /* receive work queue ) */
+static struct workqueue_struct *send_wq;        /* send work queue */
+
+static void ceph_reader(struct work_struct *);
+static void ceph_writer(struct work_struct *);
+
+struct task_struct *athread;  /* accepter thread, TBD: fill into kmsgr */
 
-/* note: early stages, doesn't build... */
-extern struct ceph_message *ceph_read_message()
+/*
+ *  TBD: Not finished Still needs tons and tons of work....
+ */
+static struct ceph_message *ceph_read_message(struct socket *sd)
 {
        int ret;
        int received = 0;
-       kvec *iov = message->payload->b_kv;
+       struct ceph_message *message;
+       struct ceph_message_header env;
+       struct kvec *iov;
+       int i;
+
+       message = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
+       message->payload = kmalloc(sizeof(struct ceph_bufferlist),GFP_KERNEL);
+       if (message == NULL || message->payload == NULL){
+               printk(KERN_INFO "malloc failure\n");
+               return NULL;
+       }
+
+       ceph_bl_init(message->payload);
+       iov = message->payload->b_kv;
+
+       /* first read in the message header */
+       if (!_krecvmsg(sd, (char*)&env, sizeof(env), 0 )) {
+               return(NULL);
+       }
+       printk(KERN_INFO "reader got envelope type = %d \n" , env.type);
+       printk(KERN_INFO "num chunks = %d \n" , env.nchunks);
+/* TBD: print to info file rest of env */
 
-       while (received < len) {
-               _krecvmsg(socket, iov->iov_base, iov->iov_len);
+       /* receive request in chunks */
+       for (i = 0; i < env.nchunks; i++) {
+               u32 size = 0;
+               void *iov_base = NULL;
+               ret = _krecvmsg(sd, (char*)&size, sizeof(size), 0);
+               if (ret <= 0) {
+                       return(NULL);
+               }
+               /* try to allocate enough contiguous pages for this chunk */
+               iov_base = ceph_buffer_create(size);
+               if (iov_base == NULL) {
+                       printk(KERN_INFO "memory allocation error\n" );
+                       /* TBD: cleanup */
+               }
+               ret = _krecvmsg(sd, iov_base, size, 0);
+               /* TBD:  place in bufferlist (payload) */
+               received += ret;  /* keep track of complete size?? */
        }
+       message->payload->b_kvlen = i;
+       /* unmarshall message */
+
+       return(message);
 }
 
-extern int ceph_send_message(struct ceph_message *message)
+/*
+ *  TBD: Not finished Still needs a lot of work....
+ */
+static int ceph_send_message(struct ceph_message *message, struct socket *sd)
 {
        int ret;
        int sent = 0;
-       int len = message->bufferlist->b_kvlen;
-       kvec *iov = message->payload->b_kv;
+       int len = message->payload->b_kvlen;
+       struct kvec *iov = message->payload->b_kv;
+       struct ceph_message_header *msghdr = message->msghdr;
+       struct ceph_bufferlist blist;
 
-       /* while (num left to send > 0) { */
-       while (sent < len) {
-               ret = _ksendmsg(socket, iov->iov_base + sent, iov->iov_len, len - sent);
-               sent += ret;
-       }
+       /* marshall/encode message */
+       /* send in chunks */
        return sent;
 }
+/*
+ * The following functions are just for testing the comms stuff...
+ */
+static char *read_response(struct socket *sd)
+{
+       char *response;
+       /* response = kmalloc(RECBUF, GFP_KERNEL); */
 
-struct ceph_accepter ceph_accepter_init()
+       return (response);
+}
+static void send_reply(struct socket *sd, char *reply)
 {
-       struct socket *sd;
+       /* char *reply = kmalloc(SENDBUF, GFP_KERNEL); */
+
+       return;
+}
+/*
+ * Accepter thread
+ */
+static int ceph_accepter(void *unusedfornow)
+{
+       struct socket *sd, *new_sd;
        struct sockaddr saddr;
+       struct ceph_connection *con = NULL;
 
        memset(&saddr, 0, sizeof(saddr));
-       /* if .ceph.hosts file get host info from file */
+
+        printk(KERN_INFO "starting kernel thread\n");
+        set_current_state(TASK_INTERRUPTIBLE);
+
+       /* TBD: if address specified by mount */
                /* make my address from user specified address, fill in saddr */
+
        sd = _klisten(&saddr);
+
+        /* an endless loop in which we are accepting connections */
+        while (!kthread_should_stop()) {
+               /* TBD: should we schedule? or will accept take care of this? */
+               new_sd = _kaccept(sd);
+                printk(KERN_INFO "accepted connection \n");
+                set_current_state(TASK_INTERRUPTIBLE);
+               /* initialize the ceph connection */
+               con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+               if (con == NULL) {
+                       printk(KERN_INFO "malloc failure\n");
+                       sock_release(new_sd);
+                       break;
+               }
+               con->sock = new_sd;
+               con->state = READ_PENDING;
+               /* setup work structure */
+               INIT_WORK(&con->rwork, ceph_reader);
+               /* hand off to worker threads , read pending */
+               queue_work(recv_wq, &con->rwork);
+        }
+        set_current_state(TASK_RUNNING);
+        printk(KERN_INFO "kernel thread exiting\n");
+       sock_release(sd);
+        return(0);
+}
+
+void ceph_dispatch(struct ceph_message *msg)
+{
+       /* probably change the state and function of the recv worker *
+         * and requeue the work */ 
+       /* also maybe keep connection alive with timeout for further
+         * communication with server... not sure if we should use connection
+         * then for dispatching ?? */
 }
 
-void ceph_dispatch(ceph_message *msg)
+
+int ceph_work_init(void)
 {
+        int ret = 0;
+
+       /*
+        * Create a num CPU threads to handle receive requests
+        * note: we can create more threads if needed to even out
+        * the scheduling of multiple requests.. 
+        */
+        recv_wq = create_workqueue("ceph recv");
+        ret = IS_ERR(recv_wq);
+        if (ret) {
+               printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+                destroy_workqueue(recv_wq);
+                return ret;
+        }
+
+       /*
+        * Create a single thread to handle send requests 
+        * note: may use same thread pool as receive workers later...
+        */
+        send_wq = create_singlethread_workqueue("ceph send");
+        ret = IS_ERR(send_wq);
+        if (ret) {
+               printk(KERN_INFO "send worker failed to start: %d\n", ret);
+                destroy_workqueue(send_wq);
+                return ret;
+        }
+/* TBD: need to do this during mount, one per kmsgr */
+       athread = kthread_run(ceph_accepter, NULL, "ceph accepter thread");
+
+        return(ret);
 }
 
-void make_addr(struct sockaddr *saddr, struct ceph_entity_addr *v)
+void ceph_work_shutdown(void)
+{
+/* TBD: need to do this during unmount*/
+/*
+       kthread_stop(msgr->athread);
+       wake_up_process(msgr->athread);
+*/
+       kthread_stop(athread);
+       wake_up_process(athread);
+       destroy_workqueue(send_wq);
+       destroy_workqueue(recv_wq);
+}
+
+/*
+static void make_addr(struct sockaddr *saddr, struct ceph_entity_addr *v)
 {
        struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr;
 
-       memset(in_addr,0,sizof(in_addr));
-       in_addr.sin_family = AF_INET;
-       memcpy((char*)in_addr.sin_addr.s_addr, (char*)v.ipq, 4);
-       in_addr.sin_port = htons(v.port);
+       memset(in_addr,0,sizeof(struct sockaddr_in));
+       in_addr->sin_family = AF_INET;
+       in_addr->sin_addr.s_addr = 
+               htonl(create_address(v.ipq[0],v.ipq[1],v.ipq[2],v.ipq[3]));
+       memcpy((char*)in_addr->sin_addr.s_addr, (char*)v.ipq, 4);
+       in_addr->sin_port = htons(v.port);
 }
-void set_addr()
+static void set_addr()
+{
+}
+*/
+
+static void ceph_reader(struct work_struct *work)
 {
+       struct ceph_connection *con = 
+               container_of(work, struct ceph_connection, rwork);
+       /* char *reply = kmalloc(RCVBUF, GFP_KERNEL); */
+       char *response = NULL;
+
+/*     send_reply(con->socket, reply); */
+       return;
+}
+static void ceph_writer(struct work_struct *work)
+{
+       struct ceph_connection *con = 
+               container_of(work, struct ceph_connection, swork);
+       /* char *reply = kmalloc(RCVBUF, GFP_KERNEL); */
+       char *response = NULL;
+
+/*     response = read_response(con->socket); */
+       return;
 }