From 3336509ebf460512ff1b3e9403829e67805e7b94 Mon Sep 17 00:00:00 2001 From: patiencew Date: Thu, 8 Nov 2007 18:57:34 +0000 Subject: [PATCH] added more structure, not finished ofcourse git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2035 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/messenger.c | 142 ++++++++++++++++++++++++---------- 1 file changed, 100 insertions(+), 42 deletions(-) diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 86766f897d6a1..999c49f541e1c 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -39,7 +39,8 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) con->msgr = msgr; spin_lock_init(&con->con_lock); - /*INIT_WORK(&con->rwork, ceph_reader);*/ /* setup work structure */ + INIT_WORK(&con->rwork, try_read); /* setup work structure */ + INIT_WORK(&con->rwork, try_write); /* setup work structure */ atomic_inc(&con->nref); return con; @@ -528,55 +529,59 @@ bad: } - - - /* * Accepter thread */ -static int ceph_accepter(struct ceph_messenger *msgr) +static int try_accept(struct work_struct *work) { struct socket *sd, *new_sd; struct sockaddr saddr; - struct ceph_connection *con = NULL; - - memset(&saddr, 0, sizeof(saddr)); - - printk(KERN_INFO "starting kernel thread\n"); - set_current_state(TASK_INTERRUPTIBLE); - - /* TBD: if address specified by mount */ - /* make my address from user specified address, fill in saddr */ - - sd = _klisten(&saddr); - - /* an endless loop in which we are accepting connections */ - while (!kthread_should_stop()) { - /* TBD: should we schedule? or will accept take care of this? */ - new_sd = _kaccept(sd); - printk(KERN_INFO "accepted connection \n"); - set_current_state(TASK_INTERRUPTIBLE); - /* initialize the msgr connection */ - con = new_connection(msgr); - if (con == NULL) { - printk(KERN_INFO "malloc failure\n"); - sock_release(new_sd); - break; - } - con->sock = new_sd; - con->state = ACCEPTING; - con->in_tag = CEPH_MSGR_TAG_READY; + struct ceph_connection *con; + struct ceph_connection *new_con = NULL; + + con = container_of(work, struct ceph_connection, awork); + - prepare_write_accept_announce(msgr, con); + printk(KERN_INFO "Entered try_accept\n"); - add_connection_accepting(msgr, con); - /* hand off to worker threads , read pending */ - /*?? queue_work(recv_wq, &con->rwork);*/ + if(kernel_accept(sd, &new_sd, sd->file->f_flags) < 0) { + printk(KERN_INFO "error accepting connection \n"); + goto done; } - set_current_state(TASK_RUNNING); - printk(KERN_INFO "kernel thread exiting\n"); - sock_release(sd); + printk(KERN_INFO "accepted connection \n"); + + /* get the address at the other end */ + memset(&saddr, 0, sizeof(saddr)); + if (new_sd->ops->getname(new_sd, saddr, &len, 2)) { + printk(KERN_INFO "getname error connection aborted\n"); + sock_release(new_sd); + goto done; + } + + /* initialize the msgr connection */ + new_con = new_connection(msgr); + if (new_con == NULL) { + printk(KERN_INFO "malloc failure\n"); + sock_release(new_sd); + goto done; + } + new_con->sock = new_sd; + setbit(ACCEPTING, &con->state); + new_con->in_tag = CEPH_MSGR_TAG_READY; + new_con->peeraddr = saddr; +/* TBD: may not use this.. */ + new_sd->sk->sk_user_data = con; + + prepare_write_accept_announce(msgr, con); + + add_connection_accepting(msgr, con); + +/* add to poll list? or hand off to send workqueue? */ + + /* hand off to worker threads , send pending */ + /*?? queue_work(send_wq, &new_con->swork);*/ +done: return(0); } @@ -609,8 +614,6 @@ int ceph_work_init(void) destroy_workqueue(send_wq); return ret; } -/* TBD: need to do this during mount, one per kmsgr */ - athread = kthread_run(ceph_accepter, NULL, "ceph accepter thread"); return(ret); } @@ -627,3 +630,58 @@ void ceph_work_shutdown(void) destroy_workqueue(send_wq); destroy_workqueue(recv_wq); } + +struct ceph_connection *new_listener(struct ceph_messenger *msgr) +{ + struct ceph_connection *con; + struct sockaddr saddr; + memset(&saddr, 0, sizeof(saddr)); + + con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL); + if (con == NULL) return 0; + memset(&con, 0, sizeof(con)); + + /* create listener connection */ + spin_lock_init(&con->con_lock); + INIT_WORK(&con->awork, try_accept); /* setup work structure */ + con->msgr = msgr; + atomic_inc(&con->nref); + + /* TBD: if address specified by mount */ + /* make my address from user specified address, fill in saddr */ + + con->sock = _klisten(&saddr); + return(con); +} + +/* + * create a new messenger + */ +static struct ceph_messenger *new_messenger() +{ + struct ceph_messenger *msgr; + struct ceph_connection *con; + + msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL); + if (msgr == NULL) return 0; + memset(&msgr, 0, sizeof(msgr)); + + con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL); + if (con == NULL) return 0; + memset(&con, 0, sizeof(con)); + + /* create listener connection */ + spin_lock_init(&con->con_lock); + INIT_WORK(&con->awork, try_accept); /* setup work structure */ + con->msgr = msgr; + atomic_inc(&con->nref); + + /* start up poll thread */ + athread = kthread_run(ceph_poll, NULL, "ceph poll thread"); */ + return msgr; +} + +struct ceph_messenger *init_messenger() +{ + +} -- 2.39.5