new_con->in_tag = CEPH_MSGR_TAG_READY;
/* TBD: fill in part of peers address */
/* new_con->peeraddr = saddr; */
-/* TBD: may not use this.. */
- new_sd->sk->sk_user_data = con;
prepare_write_accept_announce(msgr, con);
memset(&saddr, 0, sizeof(saddr));
con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
- if (con == NULL) return 0;
+ if (con == NULL) return NULL;
memset(&con, 0, sizeof(con));
/* create listener connection */
static struct ceph_messenger *new_messenger(void)
{
struct ceph_messenger *msgr;
- struct ceph_connection *con;
- struct ceph_pollable *pfiles;
+ struct ceph_connection *listener;
msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL);
- if (msgr == NULL) return 0;
+ if (msgr == NULL)
+ goto done;
memset(&msgr, 0, sizeof(msgr));
spin_lock_init(&msgr->con_lock);
-
- pfiles = kmalloc(sizeof(struct ceph_poll_task), GFP_KERNEL);
- if (pfiles == NULL) {
- kfree(msgr);
- return 0;
- }
+ INIT_LIST_HEAD(&msgr->poll_list);
/* create listener connection */
- con = new_listener(msgr);
+ listener = new_listener(msgr);
+ if (listener == NULL)
+ goto err;
- /* start polling */
- msgr->poll_task = start_poll();
+ list_add(&msgr->poll_list, &listener->poll_list);
+ /* start up poll thread */
+ msgr->poll_task = kthread_run(start_poll, msgr, "ceph-poll");
+ if (IS_ERR(msgr->poll_task))
+ goto err;
- /* add listener to pollable files
- * TBD: maybe do this before start polling */
- pfiles->con = con;
- pfiles->file = con->sock->file;
- list_add(&pfiles->poll_list, &msgr->poll_task->pfiles->poll_list); /* add to poll list */
-
+done:
return msgr;
+err:
+ kfree(msgr);
+ msgr = NULL;
+ goto done;
}
struct ceph_messenger {
void *parent;
ceph_messenger_dispatch_t dispatch;
- struct ceph_poll_task *poll_task;
+ struct task_struct *poll_task;
struct ceph_connection *listen_con; /* listening connection */
struct ceph_entity_addr addr; /* my address */
spinlock_t con_lock;
struct list_head con_all; /* all connections */
struct list_head con_accepting; /* doing handshake, or */
+ struct list_head poll_list; /* connections polling */
struct radix_tree_root con_open; /* established. see get_connection() */
};
struct list_head list_all; /* msgr->con_all */
struct list_head list_bucket; /* msgr->con_open or con_accepting */
+ struct list_head poll_list; /* msgr->poll_list */
struct ceph_entity_addr peer_addr; /* peer address */
enum ceph_connection_state state;
/* TBD: probably remove pwait, but may play around with it some..
* null for now.. No timeout, timeout maybe ignored if O_NONBLOCK anyway..
*/
-static int do_ceph_pollfd(struct file *file, poll_table *pwait)
+static int do_ceph_poll(struct ceph_connection *con, poll_table *pwait)
{
int mask;
- struct socket *sock = (struct socket *)file->private_data;
- struct sock *sk = sock->sk;
- /* may keep connection in poll list instead of using this field */
- struct ceph_connection *con =
- (struct ceph_connection *)sk->sk_user_data;
+ struct file *file = con->sock->file;
mask = file->f_op->poll(file, pwait);
/*
* Poll thread function, start after creating listener connection
*/
-static int ceph_poll(void *arg)
+int start_polling(void *arg)
{
- struct ceph_pollable *pos, *next;
- struct ceph_pollable *pollables = arg;
+ struct ceph_connection *pos, *next;
+ struct ceph_messenger *pollables = arg;
printk(KERN_INFO "starting kernel poll thread\n");
* this will work better for a large number of file descriptors
*/
list_for_each_entry_safe(pos, next, &pollables->poll_list, poll_list) {
- if (do_ceph_pollfd(pos->file, NULL)) {
- spin_lock(&pos->plock);
+ if (do_ceph_poll(pos, NULL)) {
+ spin_lock(&pos->con_lock);
/* remove file from poll_list */
list_del(&pos->poll_list);
- spin_unlock(&pos->plock);
+ spin_unlock(&pos->con_lock);
/* TBD: free list entry or reuse..Need reuse list */
/* double check not freeing out from undermyself*/
- kfree(pos);
+ /* kfree(pos); */
}
}
schedule_timeout(5*HZ); /* TBD: make configurable */
return(0);
}
-struct ceph_poll_task *start_poll()
+void stop_polling(struct ceph_messenger *msgr)
{
- struct ceph_poll_task *ptsk;
- struct ceph_pollable *pfiles;
-
- ptsk = kmalloc(sizeof(struct ceph_poll_task), GFP_KERNEL);
- if (ptsk == NULL) {
- return 0;
- }
- memset(&ptsk, 0, sizeof(ptsk));
-
- pfiles = kmalloc(sizeof(struct ceph_poll_task), GFP_KERNEL);
- if (pfiles == NULL) {
- kfree(ptsk);
- return 0;
- }
- memset(pfiles, 0, sizeof(pfiles));
- INIT_LIST_HEAD(&pfiles->poll_list);
-
- /* start up poll thread */
- ptsk->poll_task = kthread_run(ceph_poll, pfiles, "ceph-poll");
- if (IS_ERR(ptsk->poll_task)) {
- /* cleanup */
- }
-
- return(ptsk);
-}
-
-void stop_poll(struct ceph_poll_task *ptsk)
-{
- kthread_stop(ptsk->poll_task);
- wake_up_process(ptsk->poll_task);
- /* Free up poll structures.. */
+ kthread_stop(msgr->poll_task);
+ wake_up_process(msgr->poll_task);
}
/*
#ifndef __FS_CEPH_POLL_H
#define __FS_CEPH_POLL_H
-struct ceph_poll_task *start_poll(void);
-
-/* list of pollable files */
-struct ceph_pollable {
- spinlock_t plock;
- struct list_head poll_list;
- struct file *file;
- struct ceph_connection *con;
-};
+int start_poll(void *);
+/*
+ * May use, probably not..
+ */
struct ceph_poll_task {
struct task_struct *poll_task;
- struct ceph_pollable *pfiles;
+ struct list_head poll_list;
u64 timeout;
};