]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Fixed up fault handling some, and some other bugs..
authorPatience Warnick <patience@cranium.pelton.net>
Tue, 22 Jan 2008 01:29:20 +0000 (17:29 -0800)
committerPatience Warnick <patience@cranium.pelton.net>
Tue, 22 Jan 2008 01:35:46 +0000 (17:35 -0800)
Merge branch 'rados' of ssh://patience@ceph.newdream.net/home/sage/ceph.newdream.net/git/ceph into
rados

Conflicts:

src/kernel/messenger.c

src/kernel/messenger.c

index 4ccfe9ceff44f74e8529cf81131237023105e928..fcd0dcbb1a6c2e79e548b53e383bedbbadec677e 100644 (file)
@@ -22,7 +22,6 @@ static char tag_ack = CEPH_MSGR_TAG_ACK;
 
 static void try_read(struct work_struct *);
 static void try_write(struct work_struct *);
-static void try_write_later(struct work_struct *);
 static void try_accept(struct work_struct *);
 
 
@@ -54,8 +53,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        INIT_LIST_HEAD(&con->out_sent);
 
        INIT_WORK(&con->rwork, try_read);
-       INIT_WORK(&con->swork, try_write);
-       INIT_DELAYED_WORK(&con->delaywork, try_write_later);
+        INIT_DELAYED_WORK(&con->swork, try_write);
 
        return con;
 }
@@ -119,7 +117,8 @@ static void put_connection(struct ceph_connection *con)
        dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
        if (atomic_dec_and_test(&con->nref)) {
                dout(20, "put_connection destroying %p\n", con);
-               sock_release(con->sock);
+               if (con->sock)
+                       sock_release(con->sock);
                kfree(con);
                con = NULL;
        }
@@ -224,25 +223,19 @@ static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connec
  */
 void ceph_queue_write(struct ceph_connection *con)
 {
-       if (test_and_set_bit(WRITEABLE, &con->state) != 0) {
-               dout(40, "ceph_queue_write %p - already WRITEABLE\n", con);
-               return;
-       }
-       if (test_bit(WRITING, &con->state)) {
-               dout(40, "ceph_queue_write %p - still WRITING, queuing nothing\n", con);
-               return;
-       } 
        dout(40, "ceph_queue_write %p\n", con);
        atomic_inc(&con->nref);
-       queue_work(send_wq, &con->swork);
+       if (!queue_work(send_wq, &con->swork.work)) {
+               dout(40, "ceph_queue_write %p - already queued\n", con);
+               put_connection(con);
+       }
 }
 
 void ceph_queue_delayed_write(struct ceph_connection *con)
 {
        dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay);
        atomic_inc(&con->nref);
-       set_bit(WAITING, &con->state);
-       if (!queue_delayed_work(send_wq, &con->delaywork, con->delay)) {
+       if (!queue_delayed_work(send_wq, &con->swork, con->delay)) {
                dout(40, "ceph_queue_delayed_write %p - already queued\n", con);
                put_connection(con);
        }
@@ -266,82 +259,28 @@ void ceph_queue_read(struct ceph_connection *con)
  * failure case
  * A retry mechanism is used with exponential backoff
  */
-static void ceph_send_fault(struct ceph_connection *con, int error)
+static void ceph_send_fault(struct ceph_connection *con)
 {
-       derr(1, "ceph_send_fault %p state %lu error %d to peer %x:%d\n", 
-            con, con->state, error,
+       derr(1, "ceph_send_fault %p state %lu to peer %x:%d\n", 
+            con, con->state,
             ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
             ntohs(con->peer_addr.ipaddr.sin_port));
 
-       if (!con->delay) {
-               /*
-                * lossy channel.  don't bother with any retry.
-                */
-               if (error == -EAGAIN) {
-                       derr(1, "ceph_send_fault lossy channel, ignoring EAGAIN\n");
-                       return;
-               }
-               dout(1, "ceph_send_fault lossy channel, giving up\n");
-               remove_connection(con->msgr, con);
-               return;
+       if (!test_and_clear_bit(CONNECTING, &con->state)){
+               derr(1, "CONNECTING bit not set\n");
+               /* TBD: reset buffer correctly */
+               /* reset buffer */
+               spin_lock(&con->out_queue_lock);
+               list_splice_init(&con->out_sent, &con->out_queue);
+               spin_unlock(&con->out_queue_lock);
+               clear_bit(OPEN, &con->state);
        }
+       set_bit(NEW, &con->state);
+       clear_bit(CLOSED, &con->state);
+       sock_release(con->sock);
+       /* retry with delay */
+       ceph_queue_delayed_write(con);
 
-       switch (error) {
-               case -ETIMEDOUT:
-                       derr(10, "timed out to peer %x:%d\n",
-                            ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
-                            ntohs(con->peer_addr.ipaddr.sin_port));
-               /* peer unreachable, TBD: need to research if should retry
-                * a few times first, then close socket and try reconnecting */
-               case -EHOSTDOWN:
-               case -EHOSTUNREACH:
-               case -ENETUNREACH:
-                       derr(10, "ceph_send_fault peer unreachable via route state = %lu\n", con->state);
-                       /* TBD: reset buffer correctly */
-                       /* spin_lock(&con->out_queue_lock);
-                       list_splice_init(&con->out_sent, &con->out_queue);
-                       spin_unlock(&con->out_queue_lock); */
-                       /* retry with delay */
-                       ceph_queue_delayed_write(con);
-                       break;
-               case -EAGAIN:
-                       /* no space in socket buffer, ceph_write_space will
-                        * handle requeueing */
-                       if (test_bit(OPEN, &con->state))
-                               return;
-               case -EPIPE:
-               /* TBD: may do something different here, may not be allowed */
-               case -ECONNREFUSED:
-               case -ECONNRESET:
-               /* never connected socket. SOCK_DONE flag not set */
-               case -ENOTCONN:
-                       derr(10, "ceph_send_fault no connection state = %lu\n", con->state);
-                       /* TBD: setup timeout here */
-                       if (!test_and_clear_bit(CONNECTING, &con->state)){
-                               derr(1, "CONNECTING bit not set\n");
-                               /* TBD: reset buffer correctly */
-                               /* reset buffer */
-                               spin_lock(&con->out_queue_lock);
-                               list_splice_init(&con->out_sent, 
-                                                &con->out_queue);
-                               spin_unlock(&con->out_queue_lock);
-                               clear_bit(OPEN, &con->state);
-                       }
-                       set_bit(NEW, &con->state);
-                       clear_bit(CLOSED, &con->state);
-                       sock_release(con->sock);
-                       /* retry with delay */
-                       ceph_queue_delayed_write(con);
-                       break;
-               case -EIO:
-                       derr(10, "ceph_send_fault EIO set\n");
-                       /* shutdown or soft timeout */
-
-               default:
-                       /* if we ever hit here ... */
-                       derr(10, "ceph_send_fault unrecognized error %d\n", 
-                            error);
-       }
        if (con->delay < MAX_DELAY_INTERVAL)
                con->delay *= 2;
        else
@@ -518,21 +457,14 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
 /*
  * worker function when socket is writeable
  */
-static void _try_write(struct ceph_connection *con) 
+static void try_write(struct work_struct *work)
 {
+       struct ceph_connection *con = container_of(work, struct ceph_connection, swork.work);
        struct ceph_messenger *msgr = con->msgr;
        int ret = 1;
 
        dout(30, "try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref));
 
-retry:
-       clear_bit(WAITING, &con->state);
-       if (test_and_set_bit(WRITING, &con->state)) {
-               dout(20, "try_write already writing\n");
-               goto out;
-       }
-       clear_bit(WRITEABLE, &con->state);
-
        /* Peer initiated close, other end is waiting for us to close */
        if (test_and_clear_bit(CLOSING, &con->state)) {
                dout(5, "try_write peer initiated close\n");
@@ -540,11 +472,27 @@ retry:
                remove_connection(msgr, con);
                goto done;
        }
+
        if (test_bit(CLOSED, &con->state)) {
                dout(5, "try_write closed\n");
                remove_connection(msgr, con);
                goto done;
        }
+
+       if (con->sock && con->sock->sk->sk_state == TCP_CLOSE) {
+               dout(5, "try_write TCP_CLOSE received\n");
+               if (!con->delay) {
+                       dout(5, "try_write tcp_close delay = 0\n");
+                       remove_connection(msgr, con);
+                       goto done;
+               } else {
+                       dout(5, "try_write tcp_close delay != 0\n");
+                       ceph_send_fault(con);
+               /* PW hmm, keep getting tcp_close so need to drop through.. 
+                 * I want to play around with it a little more.. */
+               /*      goto done; */
+               }
+       }
 more:
        dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -556,9 +504,7 @@ more:
                ret = ceph_tcp_connect(con);
                dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
                if (ret < 0) {
-                       /* fault */
-                       derr(1, "try_write connect error\n");
-                       ceph_send_fault(con, ret);
+                       derr(1, "try_write connect error, maybe we need to cleanup?\n");
                        goto done;
                }
        }
@@ -574,7 +520,8 @@ more:
                        remove_connection(msgr, con);
                }
                if (ret < 0) {
-                       ceph_send_fault(con, ret);
+                       /* TBD: seems to always close on error,  so remove this once we're sure */
+                       /* ceph_send_fault(con, ret); */
                        goto done; /* error */
                }
        }
@@ -585,7 +532,8 @@ more:
                if (ret == 0) 
                        goto done;
                if (ret < 0) {
-                       ceph_send_fault(con, ret);
+                       /* TBD: seems to always close on error,  so remove this once we're sure */
+                       /* ceph_send_fault(con, ret); */
                        goto done;
                }
        }
@@ -601,39 +549,19 @@ more:
                /* hmm, nothing to do! No more writes pending? */
                dout(30, "try_write nothing else to write.\n");
                spin_unlock(&con->out_queue_lock);
+               /* TBD PW remove from wait queue if still there.. and put_connection */
+               if (con->delay > 1) con->delay = BASE_DELAY_INTERVAL;
                goto done;
        }
        spin_unlock(&con->out_queue_lock);
        goto more;
 
 done:
-       clear_bit(WRITING, &con->state);
-       if (test_bit(WRITEABLE, &con->state) && !test_bit(WAITING, &con->state)) {
-               dout(30, "try_write writeable flag set again, looping\n");
-               goto retry;
-       }
-
-out:
        dout(30, "try_write done on %p\n", con);
        put_connection(con);
        return;
 }
 
-static void try_write_later(struct work_struct *work)
-{
-       struct ceph_connection *con;
-       con = container_of(work, struct ceph_connection, delaywork.work);
-       _try_write(con);
-}
-
-static void try_write(struct work_struct *work)
-{
-       struct ceph_connection *con;
-       con = container_of(work, struct ceph_connection, swork);
-       _try_write(con);
-}
-
-
 /* 
  * prepare to read a message
  */
@@ -1131,6 +1059,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad
                dout(10, "mark_down dropping %p\n", con);
                set_bit(CLOSED, &con->state);  /* in case there is queued work */
                __remove_connection(msgr, con);
+               put_connection(con);
        }
        spin_unlock(&msgr->con_lock);
 }