]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
builds now
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 10 Nov 2007 22:39:33 +0000 (22:39 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 10 Nov 2007 22:39:33 +0000 (22:39 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2044 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/poll.c

index 54384d98f39f2072daf1cc4ef785c728b545b80c..0609a9926baadcaba389aacfd6ae7b9a975473e8 100644 (file)
@@ -1,48 +1,55 @@
 #include <linux/kthread.h>
 #include <linux/socket.h>
 #include <linux/net.h>
-#include <linux/ceph_fs.h>
 #include <linux/string.h>
+#include <linux/fs.h>
 #include <linux/poll.h>
 #include <net/sock.h>
 #include <net/tcp.h>
+#include <linux/ceph_fs.h>
 #include "messenger.h"
 #include "ktcp.h"
 
+static struct workqueue_struct *recv_wq;        /* receive work queue */
+static struct workqueue_struct *send_wq;        /* send work queue */
 
-static int do_ceph_pollfd(struct file *file)
+/* TBD: probably remove pwait, but may play around with it some.. 
+ * null for now.. No timeout, timeout maybe ignored if O_NONBLOCK anyway..
+*/
+static int do_ceph_pollfd(struct file *file, poll_table *pwait)
 {
         int mask;
-       struct sock *sk = file->private_data->sk;
+       struct socket *sock = (struct socket *)file->private_data;
+       struct sock *sk = sock->sk;
        /* may keep connection in poll list instead of using this field */
        struct ceph_connection *con = 
                (struct ceph_connection *)sk->sk_user_data;
 
 
-       mask = file->f_op->poll(file, NULL);
+       mask = file->f_op->poll(file, pwait);
 
         if (mask & POLLIN) {
-               printk(KERN_INFO "socket read ready: %d\n", mask);
                 /* if (sk->sk_state == TCP_LISTEN) */
-               if (test_bit(LISTENING, &con->state) {
+               printk(KERN_INFO "socket read ready: %d\n", mask);
+               if (test_bit(LISTENING, &con->state)) {
                        set_bit(ACCEPTING, &con->state);
-                        queue_work(recvwq, &con->awork)
+                        queue_work(recv_wq, &con->awork);
                        return(0); /* don't want to delete.. */
                } else {
                        /* set_bit(READ_SCHED, &con->state); */
-                       queue_work(recvwq, &con->rwork);
+                       queue_work(recv_wq, &con->rwork);
                }
         }
 
         if (mask & POLLOUT) {
                printk(KERN_INFO "socket read ready: %d\n", mask);
                /* set_bit(WRITE_SCHED, &con>state); */
-               queue_work(sendwq, &con->swork);
+               queue_work(send_wq, &con->swork);
         }
 
         if (mask & (POLLERR | POLLHUP)) {
-               printk(KERN_INFO "poll error: %d\n", mask);
-               /* TBD:  handle error need may need reconnect */
+               printk(KERN_INFO "poll hangup or error: %d\n", mask);
+               set_bit(CLOSED, &con->state);
         }
        return mask;
 }
@@ -50,9 +57,10 @@ static int do_ceph_pollfd(struct file *file)
 /*
  * Poll thread function, start after creating listener connection
  */
-static int ceph_poll(struct ceph_pollable *plist)
+static int ceph_poll(void *arg)
 {
-       struct ceph_pollable *pos, *tmp;
+       struct ceph_pollable *pos, *next;
+       struct ceph_pollable *pollables = arg;
 
         printk(KERN_INFO "starting kernel poll thread\n");
 
@@ -66,18 +74,18 @@ static int ceph_poll(struct ceph_pollable *plist)
          * doesn't have to wait for the timeout of the previous socket
          * this will work better for a large number of file descriptors
          */
-               list_for_each_entry_safe(pos, tmp, plist, ceph_pollable) {
+               list_for_each_entry_safe(pos, next, &pollables->poll_list, poll_list) {
                        if (do_ceph_pollfd(pos->file, NULL)) {
-                               /* remove file from poll_list */
                                spin_lock(&pos->plock);
-                               list_del(pos->poll_list);
+                               /* remove file from poll_list */
+                               list_del(&pos->poll_list);
                                spin_unlock(&pos->plock);
                                /* TBD: free list entry or reuse..Need reuse list */
                                /* double check not freeing out from undermyself*/
                                kfree(pos);
                        }
                }
-               schedule_timeout(timeout);
+               schedule_timeout(5*HZ);  /* TBD: make configurable */
         }
         set_current_state(TASK_RUNNING);
         printk(KERN_INFO "kernel thread exiting\n");
@@ -86,8 +94,8 @@ static int ceph_poll(struct ceph_pollable *plist)
 
 struct ceph_poll_task *start_poll()
 {
-        struct ceph_poll_task ptsk;
-        struct ceph_pollable pfiles;
+        struct ceph_poll_task *ptsk;
+        struct ceph_pollable *pfiles;
 
         ptsk = kmalloc(sizeof(struct ceph_poll_task), GFP_KERNEL);
         if (ptsk == NULL) {
@@ -101,17 +109,62 @@ struct ceph_poll_task *start_poll()
                 return 0;
         }
         memset(pfiles, 0, sizeof(pfiles));
-       INIT_LIST_HEAD(&pfiles->poll_list)
+       INIT_LIST_HEAD(&pfiles->poll_list);
 
         /* start up poll thread */
         ptsk->poll_task = kthread_run(ceph_poll, pfiles, "ceph-poll");
+       if (IS_ERR(ptsk->poll_task)) {
+               /* cleanup */
+       }
        
        return(ptsk);
 }
 
-int stop_poll(struct ceph_poll_task *ptsk)
+void stop_poll(struct ceph_poll_task *ptsk)
 {
         kthread_stop(ptsk->poll_task);
         wake_up_process(ptsk->poll_task);
        /* Free up poll structures.. */
 }
+
+/*
+ * Initialize the work queues
+ */
+
+int ceph_work_init(void)
+{
+        int ret = 0;
+
+       /*
+        * Create a num CPU threads to handle receive requests
+        * note: we can create more threads if needed to even out
+        * the scheduling of multiple requests.. 
+        */
+        recv_wq = create_workqueue("ceph recv");
+        ret = IS_ERR(recv_wq);
+        if (ret) {
+               printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+                destroy_workqueue(recv_wq);
+                return ret;
+        }
+
+       /*
+        * Create a single thread to handle send requests 
+        * note: may use same thread pool as receive workers later...
+        */
+        send_wq = create_singlethread_workqueue("ceph send");
+        ret = IS_ERR(send_wq);
+        if (ret) {
+               printk(KERN_INFO "send worker failed to start: %d\n", ret);
+                destroy_workqueue(send_wq);
+                return ret;
+        }
+
+        return(ret);
+}
+
+void ceph_work_shutdown(void)
+{
+       destroy_workqueue(send_wq);
+       destroy_workqueue(recv_wq);
+}