]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
added more structure, not finished ofcourse
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 8 Nov 2007 18:57:34 +0000 (18:57 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 8 Nov 2007 18:57:34 +0000 (18:57 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2035 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/messenger.c

index 86766f897d6a14aa915e7debf7d2e413c462dfee..999c49f541e1c84dc5fdf8ba46c6d4b62f568c9d 100644 (file)
@@ -39,7 +39,8 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        con->msgr = msgr;
 
        spin_lock_init(&con->con_lock);
-       /*INIT_WORK(&con->rwork, ceph_reader);*/        /* setup work structure */
+       INIT_WORK(&con->rwork, try_read);       /* setup work structure */
+       INIT_WORK(&con->rwork, try_write);      /* setup work structure */
 
        atomic_inc(&con->nref);
        return con;
@@ -528,55 +529,59 @@ bad:
 }
 
 
-
-
-
 /*
  * Accepter thread
  */
-static int ceph_accepter(struct ceph_messenger *msgr)
+static int try_accept(struct work_struct *work)
 {
        struct socket *sd, *new_sd;
        struct sockaddr saddr;
-       struct ceph_connection *con = NULL;
-
-       memset(&saddr, 0, sizeof(saddr));
-
-        printk(KERN_INFO "starting kernel thread\n");
-        set_current_state(TASK_INTERRUPTIBLE);
-
-       /* TBD: if address specified by mount */
-               /* make my address from user specified address, fill in saddr */
-
-       sd = _klisten(&saddr);
-
-        /* an endless loop in which we are accepting connections */
-        while (!kthread_should_stop()) {
-               /* TBD: should we schedule? or will accept take care of this? */
-               new_sd = _kaccept(sd);
-                printk(KERN_INFO "accepted connection \n");
-                set_current_state(TASK_INTERRUPTIBLE);
-               /* initialize the msgr connection */
-               con = new_connection(msgr);
-               if (con == NULL) {
-                       printk(KERN_INFO "malloc failure\n");
-                       sock_release(new_sd);
-                       break;
-               }
-               con->sock = new_sd;
-               con->state = ACCEPTING;
-               con->in_tag = CEPH_MSGR_TAG_READY;
+       struct ceph_connection *con;
+        struct ceph_connection *new_con = NULL;
+
+       con = container_of(work, struct ceph_connection, awork);
+
 
-               prepare_write_accept_announce(msgr, con);
+        printk(KERN_INFO "Entered try_accept\n");
 
-               add_connection_accepting(msgr, con);
 
-               /* hand off to worker threads , read pending */
-               /*?? queue_work(recv_wq, &con->rwork);*/
+        if(kernel_accept(sd, &new_sd, sd->file->f_flags) < 0) {
+               printk(KERN_INFO "error accepting connection \n");
+                goto done;
         }
-        set_current_state(TASK_RUNNING);
-        printk(KERN_INFO "kernel thread exiting\n");
-       sock_release(sd);
+        printk(KERN_INFO "accepted connection \n");
+
+        /* get the address at the other end */
+        memset(&saddr, 0, sizeof(saddr));
+        if (new_sd->ops->getname(new_sd, saddr, &len, 2)) {
+                printk(KERN_INFO "getname error connection aborted\n");
+                sock_release(new_sd);
+                goto done;
+        }
+
+       /* initialize the msgr connection */
+       new_con = new_connection(msgr);
+       if (new_con == NULL) {
+                       printk(KERN_INFO "malloc failure\n");
+               sock_release(new_sd);
+               goto done;
+               }
+       new_con->sock = new_sd;
+       setbit(ACCEPTING, &con->state);
+       new_con->in_tag = CEPH_MSGR_TAG_READY;
+       new_con->peeraddr = saddr;
+/* TBD: may not use this.. */
+       new_sd->sk->sk_user_data = con;
+
+       prepare_write_accept_announce(msgr, con);
+
+       add_connection_accepting(msgr, con);
+
+/* add to poll list? or hand off to send workqueue? */
+
+       /* hand off to worker threads , send pending */
+       /*?? queue_work(send_wq, &new_con->swork);*/
+done:
         return(0);
 }
 
@@ -609,8 +614,6 @@ int ceph_work_init(void)
                 destroy_workqueue(send_wq);
                 return ret;
         }
-/* TBD: need to do this during mount, one per kmsgr */
-       athread = kthread_run(ceph_accepter, NULL, "ceph accepter thread");
 
         return(ret);
 }
@@ -627,3 +630,58 @@ void ceph_work_shutdown(void)
        destroy_workqueue(send_wq);
        destroy_workqueue(recv_wq);
 }
+
+struct ceph_connection *new_listener(struct ceph_messenger *msgr)
+{
+       struct ceph_connection *con;
+        struct sockaddr saddr;
+        memset(&saddr, 0, sizeof(saddr));
+
+        con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+        if (con == NULL) return 0;
+        memset(&con, 0, sizeof(con));
+
+       /* create listener connection */
+        spin_lock_init(&con->con_lock);
+        INIT_WORK(&con->awork, try_accept);       /* setup work structure */
+        con->msgr = msgr;
+        atomic_inc(&con->nref);
+
+        /* TBD: if address specified by mount */
+                /* make my address from user specified address, fill in saddr */
+
+        con->sock = _klisten(&saddr);
+       return(con);
+}
+
+/*
+ * create a new messenger
+ */
+static struct ceph_messenger *new_messenger()
+{
+        struct ceph_messenger *msgr;
+        struct ceph_connection *con;
+
+        msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL);
+        if (msgr == NULL) return 0;
+        memset(&msgr, 0, sizeof(msgr));
+
+        con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+        if (con == NULL) return 0;
+        memset(&con, 0, sizeof(con));
+
+       /* create listener connection */
+        spin_lock_init(&con->con_lock);
+        INIT_WORK(&con->awork, try_accept);       /* setup work structure */
+        con->msgr = msgr;
+        atomic_inc(&con->nref);
+
+       /* start up poll thread */
+       athread = kthread_run(ceph_poll, NULL, "ceph poll thread"); */
+        return msgr;
+}
+
+struct ceph_messenger *init_messenger()
+{
+
+}