]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
builds somewhat..
authorpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 10 Nov 2007 22:39:56 +0000 (22:39 +0000)
committerpatiencew <patiencew@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 10 Nov 2007 22:39:56 +0000 (22:39 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2045 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/messenger.c

index 684ea439d47ee08cb04da0e7ebef21a79ddb71f9..62e22210c6ae90bc24d4f263ccbff0c1ae4fb29a 100644 (file)
@@ -9,9 +9,6 @@
 #include "messenger.h"
 #include "ktcp.h"
 
-static struct workqueue_struct *recv_wq;        /* receive work queue ) */
-static struct workqueue_struct *send_wq;        /* send work queue */
-
 /* static tag bytes */
 static char tag_ready = CEPH_MSGR_TAG_READY;
 static char tag_reject = CEPH_MSGR_TAG_REJECT;
@@ -19,6 +16,9 @@ static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_close = CEPH_MSGR_TAG_CLOSE;
 
+static void try_read(struct work_struct *);
+static void try_write(struct work_struct *);
+static void try_accept(struct work_struct *);
 
 /*
  * connections
@@ -38,7 +38,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
 
        spin_lock_init(&con->con_lock);
        INIT_WORK(&con->rwork, try_read);       /* setup work structure */
-       INIT_WORK(&con->rwork, try_write);      /* setup work structure */
+       INIT_WORK(&con->swork, try_write);      /* setup work structure */
 
        atomic_inc(&con->nref);
        return con;
@@ -190,7 +190,7 @@ static int write_partial(struct ceph_connection *con)
 more:
        len = bl->b_kv[p->i_kv].iov_len - p->i_off;
        /* FIXME */
-       ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len);
+       /* ret = kernel_send(con->sock, bl->b_kv[p->i_kv].iov_base + p->i_off, len); */
        if (ret < 0) return ret;
        if (ret == 0) return 0;   /* socket full */
        if (ret + p->i_off == bl->b_kv[p->i_kv].iov_len) {
@@ -271,18 +271,20 @@ static void prepare_write_accept_reject(struct ceph_connection *con)
 /*
  * call when socket is writeable
  */
-static int try_write(struct ceph_messenger *msgr, struct work_struct *work)
+static void try_write(struct work_struct *work)
 {
        int ret;
        struct ceph_connection *con;
+       struct ceph_messenger *msgr;
 
        con = container_of(work, struct ceph_connection, swork);
+       msgr = con->msgr;
 
 more:
        /* data queued? */
        if (con->out_partial.b_kvlen) {
                ret = write_partial(con);
-               if (ret == 0) return 0;
+               if (ret == 0) goto done;
 
                /* error or success */
                /* clean up */
@@ -296,7 +298,8 @@ more:
                        put_connection(con);
                }
                
-               if (ret < 0) return ret; /* error */
+               /* TBD: handle error; return for now */
+               if (ret < 0) goto done; /* error */
        }
        
        /* anything else pending? */
@@ -310,7 +313,8 @@ more:
        }
        
        /* hmm, nothing to do! */
-       return 0;
+done:
+       return;
 }
 
 
@@ -320,11 +324,12 @@ more:
 static int read_message_partial(struct ceph_connection *con)
 {
        struct ceph_message *m = con->in_partial;
-       int left, ret, s, chunkbytes, c, did;
+       int ret, s, chunkbytes, c, did;
+       size_t left;
 
        while (con->in_base_pos < sizeof(struct ceph_message_header)) {
                left = sizeof(struct ceph_message_header) - con->in_base_pos;
-               ret = _read(con->sock, &m->hdr + con->in_base_pos, left);
+               ret = _krecvmsg(con->sock, &m->hdr + con->in_base_pos, left, 0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -334,7 +339,7 @@ static int read_message_partial(struct ceph_connection *con)
        while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) {
                int off = con->in_base_pos - sizeof(struct ceph_message_header);
                left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos;
-               ret = _read(con->sock, (char*)m->chunklens + off, left);
+               ret = _krecvmsg(con->sock, (char*)m->chunklens + off, left, 0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -349,7 +354,7 @@ static int read_message_partial(struct ceph_connection *con)
                }
                ceph_bl_prepare_append(&m->payload, left);
                s = min(m->payload.b_append.iov_len, left);
-               ret = _read(con->sock, m->payload.b_append.iov_base, s);
+               ret = _krecvmsg(con->sock, m->payload.b_append.iov_base, s, 0);
                if (ret <= 0) return ret;
                ceph_bl_append_copied(&m->payload, s);
                goto more;
@@ -364,7 +369,7 @@ static int read_ack_partial(struct ceph_connection *con)
 {
        while (con->in_base_pos < sizeof(con->in_partial_ack)) {
                int left = sizeof(con->in_partial_ack) - con->in_base_pos;
-               int ret = _read(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
+               int ret = _krecvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left, 0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -374,10 +379,12 @@ static int read_ack_partial(struct ceph_connection *con)
 
 static int read_accept_partial(struct ceph_connection *con)
 {
+       int ret;
+
        /* peer addr */
        while (con->in_base_pos < sizeof(con->peer_addr)) {
                int left = sizeof(con->peer_addr) - con->in_base_pos;
-               int ret = _read(con->sock, (char*)&con->peer_addr + con->in_base_pos, left);
+               ret = _krecvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left, 0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -386,7 +393,7 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
                int off = con->in_base_pos - sizeof(con->peer_addr);
                int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos;
-               ret = _read(con->sock, (char*)&con->connect_seq + off, left);
+               ret = _krecvmsg(con->sock, (char*)&con->connect_seq + off, left,0);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -396,15 +403,22 @@ static int read_accept_partial(struct ceph_connection *con)
 /* 
  * prepare to read a message
  */
-static int prepare_read_message(struct ceph_connection *con)
+static void prepare_read_message(struct ceph_connection *con)
 {
        con->in_tag = CEPH_MSGR_TAG_MSG;
        con->in_base_pos = 0;
        con->in_partial = kmalloc(sizeof(struct ceph_message), GFP_KERNEL);
-       if (!con->in_partial) return -1;  /* crap */
+       if (!con->in_partial) {
+               /* TBD: we don't check for error in caller, handle error */
+               printk(KERN_INFO "malloc failure\n");
+               goto done;
+       }
+
        ceph_get_msg(con->in_partial);
        ceph_bl_init(&con->in_partial->payload);
        ceph_bl_iterator_init(&con->in_pos);
+done:
+       return;
 }
 
 /* 
@@ -473,26 +487,40 @@ static void process_accept(struct ceph_connection *con)
 /*
  * call when data is available on the socket
  */
-static int try_read(struct work_struct *work)
+static void try_read(struct work_struct *work)
 {
        int ret = -1;
        struct ceph_connection *con;
+       struct ceph_messenger *msgr;
 
        con = container_of(work, struct ceph_connection, rwork);
+       msgr = con->msgr;
 
 more:
-       if (con->state == CLOSED) return -1;
-       if (con->state == ACCEPTING) {
+       /*
+        * TBD: maybe store error in ceph_connection
+        * since this is run in a workqueue, we probably need to notify
+         * whoever added the connection to poll list of completion for 
+        * dispatching.  Or error
+         */
+       /* if (con->state == CLOSED) return -1; */
+
+       if (test_bit(CLOSED, &con->state)) goto done;
+       if (test_bit(ACCEPTING, &con->state)) {
                ret = read_accept_partial(con);
-               if (ret <= 0) return ret;
+               /* TBD: do something with error */
+               /* if (ret <= 0) return ret; */
+               if (ret <= 0) goto done;
                /* accepted */
                process_accept(con);
                goto more;
        }
 
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
-               ret = _read(con->sock, &con->in_tag, 1);
-               if (ret <= 0) return ret;
+               ret = _krecvmsg(con->sock, &con->in_tag, 1, 0);
+               /* if (ret <= 0) return ret; */
+               /* TBD: do something with error */
+               if (ret <= 0) goto done;
                if (con->in_tag == CEPH_MSGR_TAG_MSG) 
                        prepare_read_message(con);
                else if (con->in_tag == CEPH_MSGR_TAG_ACK)
@@ -505,7 +533,9 @@ more:
        }
        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
                ret = read_message_partial(con);
-               if (ret <= 0) return ret;
+               /* if (ret <= 0) return ret; */
+               /* TBD: do something with error */
+               if (ret <= 0) goto done;
                /* got a full message! */
                msgr->dispatch(con->msgr->parent, con->in_partial);
                ceph_put_msg(con->in_partial);
@@ -515,7 +545,9 @@ more:
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
                ret = read_ack_partial(con);
-               if (ret <= 0) return ret;
+               /* if (ret <= 0) return ret; */
+               /* TBD: do something with error */
+               if (ret <= 0) goto done;
                /* got an ack */
                process_ack(con, con->in_partial_ack);
                con->in_tag = CEPH_MSGR_TAG_READY;
@@ -523,21 +555,26 @@ more:
        }
 bad:
        BUG_ON(1); /* shouldn't get here */
-       return ret;
+done:
+       return;
 }
 
 
 /*
  * Accepter thread
  */
-static int try_accept(struct work_struct *work)
+static void try_accept(struct work_struct *work)
 {
        struct socket *sd, *new_sd;
        struct sockaddr saddr;
        struct ceph_connection *con;
         struct ceph_connection *new_con = NULL;
+       struct ceph_messenger *msgr;
+       int len;
 
        con = container_of(work, struct ceph_connection, awork);
+       msgr = con->msgr;
+       sd = con->sock;
 
 
         printk(KERN_INFO "Entered try_accept\n");
@@ -551,7 +588,7 @@ static int try_accept(struct work_struct *work)
 
         /* get the address at the other end */
         memset(&saddr, 0, sizeof(saddr));
-        if (new_sd->ops->getname(new_sd, saddr, &len, 2)) {
+        if (new_sd->ops->getname(new_sd, &saddr, &len, 2)) {
                 printk(KERN_INFO "getname error connection aborted\n");
                 sock_release(new_sd);
                 goto done;
@@ -565,9 +602,10 @@ static int try_accept(struct work_struct *work)
                goto done;
                }
        new_con->sock = new_sd;
-       setbit(ACCEPTING, &con->state);
+       set_bit(ACCEPTING, &con->state);
        new_con->in_tag = CEPH_MSGR_TAG_READY;
-       new_con->peeraddr = saddr;
+       /* TBD: fill in part of peers address */
+       /* new_con->peeraddr = saddr; */
 /* TBD: may not use this.. */
        new_sd->sk->sk_user_data = con;
 
@@ -580,46 +618,7 @@ static int try_accept(struct work_struct *work)
        /* hand off to worker threads , send pending */
        /*?? queue_work(send_wq, &new_con->swork);*/
 done:
-        return(0);
-}
-
-
-int ceph_work_init(void)
-{
-        int ret = 0;
-
-       /*
-        * Create a num CPU threads to handle receive requests
-        * note: we can create more threads if needed to even out
-        * the scheduling of multiple requests.. 
-        */
-        recv_wq = create_workqueue("ceph recv");
-        ret = IS_ERR(recv_wq);
-        if (ret) {
-               printk(KERN_INFO "receive worker failed to start: %d\n", ret);
-                destroy_workqueue(recv_wq);
-                return ret;
-        }
-
-       /*
-        * Create a single thread to handle send requests 
-        * note: may use same thread pool as receive workers later...
-        */
-        send_wq = create_singlethread_workqueue("ceph send");
-        ret = IS_ERR(send_wq);
-        if (ret) {
-               printk(KERN_INFO "send worker failed to start: %d\n", ret);
-                destroy_workqueue(send_wq);
-                return ret;
-        }
-
-        return(ret);
-}
-
-void ceph_work_shutdown(void)
-{
-       destroy_workqueue(send_wq);
-       destroy_workqueue(recv_wq);
+        return;
 }
 
 struct ceph_connection *new_listener(struct ceph_messenger *msgr)
@@ -648,11 +647,11 @@ struct ceph_connection *new_listener(struct ceph_messenger *msgr)
 /*
  * create a new messenger
  */
-static struct ceph_messenger *new_messenger()
+static struct ceph_messenger *new_messenger(void)
 {
         struct ceph_messenger *msgr;
         struct ceph_connection *con;
-       struct ceph_pollable *pfile;
+       struct ceph_pollable *pfiles;
 
         msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL);
         if (msgr == NULL) return 0;
@@ -677,7 +676,7 @@ static struct ceph_messenger *new_messenger()
          * TBD: maybe do this before start polling */
        pfiles->con = con;
        pfiles->file = con->sock->file;
-       list_add(&pfiles->poll_list);  /* add to poll list */
+       list_add(&pfiles->poll_list, &msgr->poll_task->pfiles->poll_list);  /* add to poll list */
 
         return msgr;
 }