#include <linux/kthread.h>
#include <linux/socket.h>
#include <linux/net.h>
-#include <linux/ceph_fs.h>
#include <linux/string.h>
+#include <linux/fs.h>
#include <linux/poll.h>
#include <net/sock.h>
#include <net/tcp.h>
+#include <linux/ceph_fs.h>
#include "messenger.h"
#include "ktcp.h"
+static struct workqueue_struct *recv_wq; /* receive work queue */
+static struct workqueue_struct *send_wq; /* send work queue */
-static int do_ceph_pollfd(struct file *file)
+/* TBD: probably remove pwait, but may play around with it some..
+ * null for now.. No timeout, timeout maybe ignored if O_NONBLOCK anyway..
+*/
+static int do_ceph_pollfd(struct file *file, poll_table *pwait)
{
int mask;
- struct sock *sk = file->private_data->sk;
+ struct socket *sock = (struct socket *)file->private_data;
+ struct sock *sk = sock->sk;
/* may keep connection in poll list instead of using this field */
struct ceph_connection *con =
(struct ceph_connection *)sk->sk_user_data;
- mask = file->f_op->poll(file, NULL);
+ mask = file->f_op->poll(file, pwait);
if (mask & POLLIN) {
- printk(KERN_INFO "socket read ready: %d\n", mask);
/* if (sk->sk_state == TCP_LISTEN) */
- if (test_bit(LISTENING, &con->state) {
+ printk(KERN_INFO "socket read ready: %d\n", mask);
+ if (test_bit(LISTENING, &con->state)) {
set_bit(ACCEPTING, &con->state);
- queue_work(recvwq, &con->awork)
+ queue_work(recv_wq, &con->awork);
return(0); /* don't want to delete.. */
} else {
/* set_bit(READ_SCHED, &con->state); */
- queue_work(recvwq, &con->rwork);
+ queue_work(recv_wq, &con->rwork);
}
}
if (mask & POLLOUT) {
printk(KERN_INFO "socket read ready: %d\n", mask);
/* set_bit(WRITE_SCHED, &con>state); */
- queue_work(sendwq, &con->swork);
+ queue_work(send_wq, &con->swork);
}
if (mask & (POLLERR | POLLHUP)) {
- printk(KERN_INFO "poll error: %d\n", mask);
- /* TBD: handle error need may need reconnect */
+ printk(KERN_INFO "poll hangup or error: %d\n", mask);
+ set_bit(CLOSED, &con->state);
}
return mask;
}
/*
* Poll thread function, start after creating listener connection
*/
-static int ceph_poll(struct ceph_pollable *plist)
+static int ceph_poll(void *arg)
{
- struct ceph_pollable *pos, *tmp;
+ struct ceph_pollable *pos, *next;
+ struct ceph_pollable *pollables = arg;
printk(KERN_INFO "starting kernel poll thread\n");
* doesn't have to wait for the timeout of the previous socket
* this will work better for a large number of file descriptors
*/
- list_for_each_entry_safe(pos, tmp, plist, ceph_pollable) {
+ list_for_each_entry_safe(pos, next, &pollables->poll_list, poll_list) {
if (do_ceph_pollfd(pos->file, NULL)) {
- /* remove file from poll_list */
spin_lock(&pos->plock);
- list_del(pos->poll_list);
+ /* remove file from poll_list */
+ list_del(&pos->poll_list);
spin_unlock(&pos->plock);
/* TBD: free list entry or reuse..Need reuse list */
/* double check not freeing out from undermyself*/
kfree(pos);
}
}
- schedule_timeout(timeout);
+ schedule_timeout(5*HZ); /* TBD: make configurable */
}
set_current_state(TASK_RUNNING);
printk(KERN_INFO "kernel thread exiting\n");
struct ceph_poll_task *start_poll()
{
- struct ceph_poll_task ptsk;
- struct ceph_pollable pfiles;
+ struct ceph_poll_task *ptsk;
+ struct ceph_pollable *pfiles;
ptsk = kmalloc(sizeof(struct ceph_poll_task), GFP_KERNEL);
if (ptsk == NULL) {
return 0;
}
memset(pfiles, 0, sizeof(pfiles));
- INIT_LIST_HEAD(&pfiles->poll_list)
+ INIT_LIST_HEAD(&pfiles->poll_list);
/* start up poll thread */
ptsk->poll_task = kthread_run(ceph_poll, pfiles, "ceph-poll");
+ if (IS_ERR(ptsk->poll_task)) {
+ /* cleanup */
+ }
return(ptsk);
}
-int stop_poll(struct ceph_poll_task *ptsk)
+void stop_poll(struct ceph_poll_task *ptsk)
{
kthread_stop(ptsk->poll_task);
wake_up_process(ptsk->poll_task);
/* Free up poll structures.. */
}
+
+/*
+ * Initialize the work queues
+ */
+
+int ceph_work_init(void)
+{
+ int ret = 0;
+
+ /*
+ * Create a num CPU threads to handle receive requests
+ * note: we can create more threads if needed to even out
+ * the scheduling of multiple requests..
+ */
+ recv_wq = create_workqueue("ceph recv");
+ ret = IS_ERR(recv_wq);
+ if (ret) {
+ printk(KERN_INFO "receive worker failed to start: %d\n", ret);
+ destroy_workqueue(recv_wq);
+ return ret;
+ }
+
+ /*
+ * Create a single thread to handle send requests
+ * note: may use same thread pool as receive workers later...
+ */
+ send_wq = create_singlethread_workqueue("ceph send");
+ ret = IS_ERR(send_wq);
+ if (ret) {
+ printk(KERN_INFO "send worker failed to start: %d\n", ret);
+ destroy_workqueue(send_wq);
+ return ret;
+ }
+
+ return(ret);
+}
+
+void ceph_work_shutdown(void)
+{
+ destroy_workqueue(send_wq);
+ destroy_workqueue(recv_wq);
+}