]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: close socket in work queue (and still on last ref.)
authorSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 20:03:25 +0000 (13:03 -0700)
committerSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 20:03:25 +0000 (13:03 -0700)
src/kernel/messenger.c
src/kernel/messenger.h

index e16c8aa89f9bd4fc1062caf4d05a8d98e24e0202..5d788f390e28b38f9a850c1bd5cfc9d51e601a8c 100644 (file)
@@ -38,7 +38,7 @@ static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 
 
-static void ceph_queue_con(struct ceph_connection *con);
+static void queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
 static void ceph_fault(struct ceph_connection *con);
 
@@ -77,7 +77,7 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
        if (sk->sk_state != TCP_CLOSE_WAIT) {
                dout("ceph_data_ready on %p state = %lu, queueing work\n",
                     con, con->state);
-               ceph_queue_con(con);
+               queue_con(con);
        }
 }
 
@@ -90,7 +90,7 @@ static void ceph_write_space(struct sock *sk)
        /* only queue to workqueue if there is data we want to write. */
        if (test_bit(WRITE_PENDING, &con->state)) {
                dout("ceph_write_space %p queueing write work\n", con);
-               ceph_queue_con(con);
+               queue_con(con);
        } else {
                dout("ceph_write_space %p nothing to write\n", con);
        }
@@ -121,11 +121,11 @@ static void ceph_state_change(struct sock *sk)
                        con->error_msg = "connection failed";
                else
                        con->error_msg = "socket closed";
-               ceph_queue_con(con);
+               queue_con(con);
                break;
        case TCP_ESTABLISHED:
                dout("ceph_state_change TCP_ESTABLISHED\n");
-               ceph_queue_con(con);
+               queue_con(con);
                break;
        }
 }
@@ -240,6 +240,7 @@ void ceph_con_close(struct ceph_connection *con)
        dout("close %p peer %u.%u.%u.%u:%u\n", con,
             IPQUADPORT(con->peer_addr.ipaddr));
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
+       queue_con(con);
 }
 
 /*
@@ -1393,27 +1394,27 @@ bad_tag:
  * we give up (work also already being done or is queued) but leave QUEUED
  * set so that the worker thread will loop if necessary.
  */
-static void ceph_queue_con(struct ceph_connection *con)
+static void queue_con(struct ceph_connection *con)
 {
        if (test_bit(WAIT, &con->state) ||
            test_bit(CLOSED, &con->state)) {
-               dout("ceph_queue_con %p ignoring: WAIT|CLOSED\n",
+               dout("queue_con %p ignoring: WAIT|CLOSED\n",
                     con);
                return;
        }
 
        if (!con->ops->get(con)) {
-               dout("ceph_queue_con %p ref count 0\n", con);
+               dout("queue_con %p ref count 0\n", con);
                return;
        }
 
        set_bit(QUEUED, &con->state);
        if (test_bit(BUSY, &con->state) ||
            !queue_work(ceph_msgr_wq, &con->work.work)) {
-               dout("ceph_queue_con %p - already BUSY or queued\n", con);
+               dout("queue_con %p - already BUSY or queued\n", con);
                con->ops->put(con);
        } else {
-               dout("ceph_queue_con %p\n", con);
+               dout("queue_con %p\n", con);
        }
 }
 
@@ -1436,6 +1437,13 @@ more:
 
        if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
                dout("con_work CLOSED\n");
+               ceph_con_destroy(con);
+               if (test_and_clear_bit(REOPEN, &con->state)) {
+                       /* reopen w/ new peer */
+                       dout("con_work REPOPEN\n");
+                       clear_bit(CLOSED, &con->state);
+                       goto more;
+               }
                goto done;
        }
        if (test_bit(WAIT, &con->state)) {   /* we are a zombie */
@@ -1608,7 +1616,13 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
  */
 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 {
-       /* set source */
+       if (test_bit(CLOSED, &con->state)) {
+               dout("con_send %p closed, dropping %p\n", con, msg);
+               ceph_msg_put(msg);
+               return;
+       }
+
+       /* set src+dst */
        msg->hdr.src = con->msgr->inst;
        msg->hdr.orig_src = con->msgr->inst;
        msg->hdr.dst.addr = con->peer_addr;
@@ -1633,7 +1647,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        /* if there wasn't anything waiting to send before, queue
         * new work */
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
-               ceph_queue_con(con);
+               queue_con(con);
 }
 
 /*
@@ -1643,7 +1657,7 @@ void ceph_con_keepalive(struct ceph_connection *con)
 {
        if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
            test_and_set_bit(WRITE_PENDING, &con->state) == 0)
-               ceph_queue_con(con);
+               queue_con(con);
 }
 
 
index e8e7e3cce488c06dc01e7513ad67ab354f57fb2a..051b9411bce7833a0828ae861e74fb7e326ef830 100644 (file)
@@ -134,6 +134,7 @@ struct ceph_msg_pos {
 #define CLOSED         10 /* we've closed the connection */
 #define SOCK_CLOSED    11 /* socket state changed to closed */
 #define REGISTERED      12 /* connection appears in con_tree */
+#define REOPEN          13 /* reopen connection w/ new peer */
 
 /*
  * A single connection with another host.