]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: multithread
authorSage Weil <sage@newdream.net>
Thu, 19 Jun 2008 03:07:17 +0000 (20:07 -0700)
committerSage Weil <sage@newdream.net>
Thu, 19 Jun 2008 03:07:17 +0000 (20:07 -0700)
src/kernel/ktcp.c
src/kernel/messenger.c
src/kernel/messenger.h

index cd4c23485580e83ee0a46314a31e762f1d93a42a..3dc90732adcd0cae1068dd38f2c462d6b09b2f5e 100644 (file)
@@ -11,7 +11,6 @@ int ceph_debug_tcp;
 #include "super.h"
 
 struct workqueue_struct *con_wq;
-struct workqueue_struct *accept_wq;    /* accept work queue */
 
 
 /*
@@ -25,8 +24,8 @@ static void ceph_accept_ready(struct sock *sk, int count_unused)
 
        dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
             msgr, sk->sk_state);
-       if (msgr && (sk->sk_state == TCP_LISTEN))
-               queue_work(accept_wq, &msgr->awork);
+       if (sk->sk_state == TCP_LISTEN)
+               queue_work(con_wq, &msgr->awork);
 }
 
 /* Data available on socket or listen socket received a connect */
@@ -34,11 +33,7 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
 {
        struct ceph_connection *con =
                (struct ceph_connection *)sk->sk_user_data;
-       if (con &&
-           !test_bit(NEW, &con->state) &&
-           !test_bit(WAIT, &con->state) &&
-           !test_bit(CLOSED, &con->state) &&
-           (sk->sk_state != TCP_CLOSE_WAIT)) {
+       if (sk->sk_state != TCP_CLOSE_WAIT) {
                dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
                     con, con->state);
                ceph_queue_con(con);
@@ -54,11 +49,7 @@ static void ceph_write_space(struct sock *sk)
        dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
 
        /* only queue to workqueue if a WRITE is pending */
-       if (con &&
-           !test_bit(NEW, &con->state) &&
-           !test_bit(WAIT, &con->state) &&
-           !test_bit(CLOSED, &con->state) &&
-           test_bit(WRITE_PENDING, &con->state)) {
+       if (test_bit(WRITE_PENDING, &con->state)) {
                dout(30, "ceph_write_space %p queuing write work\n", con);
                ceph_queue_con(con);
        }
@@ -72,11 +63,6 @@ static void ceph_state_change(struct sock *sk)
 {
        struct ceph_connection *con =
                (struct ceph_connection *)sk->sk_user_data;
-       if (con == NULL ||
-           test_bit(NEW, &con->state) ||
-           test_bit(CLOSED, &con->state) ||
-           test_bit(WAIT, &con->state))
-               return;
 
        dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
             con, con->state, sk->sk_state);
@@ -309,14 +295,7 @@ int ceph_workqueue_init(void)
 {
        int ret = 0;
 
-       dout(20, "entered work_init\n");
-
-       /*
-        * this needs to be a single threaded queue until we
-        * have add a way to ensure that each connection is only
-        * being processed by a single thread at a time.
-        */
-       con_wq = create_singlethread_workqueue("ceph-net");
+       con_wq = create_workqueue("ceph-net");
        if (IS_ERR(con_wq)) {
                derr(0, "net worker failed to start: %d\n", ret);
                destroy_workqueue(con_wq);
@@ -325,17 +304,6 @@ int ceph_workqueue_init(void)
                return ret;
        }
 
-       accept_wq = create_singlethread_workqueue("ceph-accept");
-       if (IS_ERR(accept_wq)) {
-               derr(0, "net worker failed to start: %d\n", ret);
-               destroy_workqueue(accept_wq);
-               ret = PTR_ERR(accept_wq);
-               accept_wq = 0;
-               return ret;
-       }
-
-       dout(20, "successfully created wrkqueues\n");
-
        return(ret);
 }
 
@@ -345,5 +313,4 @@ int ceph_workqueue_init(void)
 void ceph_workqueue_shutdown(void)
 {
        destroy_workqueue(con_wq);
-       destroy_workqueue(accept_wq);
 }
index 20ad85a4a7bbb4747b3cbefb41c4150d78b8baf6..4e93d59725d92cbd2ab3523325c9b2b76987ca2a 100644 (file)
@@ -235,11 +235,20 @@ static void remove_connection(struct ceph_messenger *msgr,
  */
 void ceph_queue_con(struct ceph_connection *con)
 {
+       if (test_bit(WAIT, &con->state) ||
+           test_bit(CLOSED, &con->state)) {
+               dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED\n", con);
+               return;
+       }
+
        atomic_inc(&con->nref);
        dout(40, "ceph_queue_con %p %d -> %d\n", con,
             atomic_read(&con->nref) - 1, atomic_read(&con->nref));
-       if (!queue_work(con_wq, &con->work.work)) {
-               dout(40, "ceph_queue_write %p - already queued\n", con);
+       
+       set_bit(QUEUED, &con->state);
+       if (test_bit(BUSY, &con->state) ||
+           !queue_work(con_wq, &con->work.work)) {
+               dout(40, "ceph_queue_write %p - already BUSY or queued\n", con);
                put_connection(con);
        }
 }
@@ -250,9 +259,12 @@ void ceph_queue_con_delayed(struct ceph_connection *con)
        dout(40, "ceph_queue_con_delayed %p delay %lu %d -> %d\n", con,
             con->delay,
             atomic_read(&con->nref) - 1, atomic_read(&con->nref));
-       if (!queue_delayed_work(con_wq, &con->work,
+       set_bit(QUEUED, &con->state);
+       if (test_bit(BUSY, &con->state) ||
+           !queue_delayed_work(con_wq, &con->work,
                                round_jiffies_relative(con->delay))) {
-               dout(40, "ceph_queue_con_delayed %p - already queued\n", con);
+               dout(40, "ceph_queue_con_delayed %p - already BUSY or queued\n",
+                    con);
                put_connection(con);
        }
 }
@@ -1237,7 +1249,15 @@ static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
-
+       
+more:
+       if (test_and_set_bit(BUSY, &con->state) != 0) {
+               dout(10, "con_work %p BUSY already set\n", con);
+               goto out;
+       }
+       dout(10, "con_work %p start, clearing QUEUED\n", con);
+       clear_bit(QUEUED, &con->state);
+       
        if (test_bit(CLOSED, &con->state) ||
            test_bit(STANDBY, &con->state)) {
                dout(5, "con_work CLOSED|STANDBY\n");
@@ -1250,6 +1270,14 @@ static void con_work(struct work_struct *work)
                ceph_fault(con);
 
 done:
+       clear_bit(BUSY, &con->state);
+       if (test_bit(QUEUED, &con->state)) {
+               dout(10, "con_work %p QUEUED reset, looping\n", con);
+               goto more;
+       }
+       dout(10, "con_work %p done\n", con);
+
+out:
        put_connection(con);
 }
 
index a0b8b8895e39a1f04aa6d6d1650b33482c1ff1fd..7f339e1477824096e49cce630870d8068beed03a 100644 (file)
@@ -76,7 +76,8 @@ struct ceph_msg_pos {
 #define ACCEPTING      2
 #define OPEN           3
 #define WRITE_PENDING  4  /* we have data to send */
-#define NOSOCK          5  /* mask socket callbacks */
+#define QUEUED          5  /* there is work to be done */
+#define BUSY            6  /* work is being done */
 #define WAIT           7  /* wait for peer to connect */
 #define CLOSED         8  /* we've closed the connection */
 #define SOCK_CLOSE     9  /* socket state changed to close */