]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: make con_close, con_reopen work
authorSage Weil <sage@newdream.net>
Fri, 28 Aug 2009 21:27:18 +0000 (14:27 -0700)
committerSage Weil <sage@newdream.net>
Fri, 28 Aug 2009 21:27:18 +0000 (14:27 -0700)
src/kernel/messenger.c
src/kernel/messenger.h

index 1f42b110fb99fcb38154530466eaaac24e81e6fb..56c00c3a6dcc1c0cf80f08aa1933412a65c1c522 100644 (file)
@@ -232,6 +232,25 @@ static int con_close_socket(struct ceph_connection *con)
        return rc;
 }
 
+/*
+ * Reset a connection.  Discard all incoming and outgoing messages
+ * and clear *_seq state.
+ */
+static void reset_connection(struct ceph_connection *con)
+{
+       /* reset connection, out_queue, msg_ and connect_seq */
+       /* discard existing out_queue and msg_seq */
+       spin_lock(&con->out_queue_lock);
+       ceph_msg_put_list(&con->out_queue);
+       ceph_msg_put_list(&con->out_sent);
+
+       con->connect_seq = 0;
+       con->out_seq = 0;
+       con->out_msg = NULL;
+       con->in_seq = 0;
+       spin_unlock(&con->out_queue_lock);
+}
+
 /*
  * mark a peer down.  drop any open connections.
  */
@@ -240,6 +259,7 @@ void ceph_con_close(struct ceph_connection *con)
        dout("close %p peer %u.%u.%u.%u:%u\n", con,
             IPQUADPORT(con->peer_addr.ipaddr));
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
+       reset_connection(con);
        queue_con(con);
 }
 
@@ -249,12 +269,24 @@ void ceph_con_close(struct ceph_connection *con)
 void ceph_con_destroy(struct ceph_connection *con)
 {
        dout("con_destroy %p destroying\n", con);
-       ceph_msg_put_list(&con->out_queue);
-       ceph_msg_put_list(&con->out_sent);
-       set_bit(CLOSED, &con->state);
+       reset_connection(con);
+       set_bit(DEAD, &con->state);
        con_close_socket(con); /* silently ignore errors */
 }
 
+/*
+ * Reopen a closed connection, with a new peer address.
+ */
+void ceph_con_reopen(struct ceph_connection *con, struct ceph_entity_addr *addr)
+{
+       dout("con_reopen %p %u.%u.%u.%u:%u\n", con, IPQUADPORT(addr->ipaddr));
+       BUG_ON(!test_bit(CLOSED, &con->state));
+       set_bit(REOPEN, &con->state);
+       clear_bit(CLOSED, &con->state);
+       memcpy(&con->peer_addr, addr, sizeof(*addr));
+       queue_con(con);
+}
+
 /*
  * generic get/put
  */
@@ -727,31 +759,6 @@ static int verify_hello(struct ceph_connection *con)
        return 0;
 }
 
-/*
- * Reset a connection.  Discard all incoming and outgoing messages
- * and clear *_seq state.
- */
-static void reset_connection(struct ceph_connection *con)
-{
-       pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n",
-              ENTITY_NAME(con->peer_name),
-              IPQUADPORT(con->peer_addr.ipaddr));
-
-       /* reset connection, out_queue, msg_ and connect_seq */
-       /* discard existing out_queue and msg_seq */
-       spin_lock(&con->out_queue_lock);
-       ceph_msg_put_list(&con->out_queue);
-       ceph_msg_put_list(&con->out_sent);
-
-       con->connect_seq = 0;
-       con->out_seq = 0;
-       con->out_msg = NULL;
-       con->in_seq = 0;
-       con->in_msg = NULL;
-       spin_unlock(&con->out_queue_lock);
-}
-
-
 static int process_connect(struct ceph_connection *con)
 {
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -791,6 +798,9 @@ static int process_connect(struct ceph_connection *con)
                 */
                dout("process_connect got RESET peer seq %u\n",
                     le32_to_cpu(con->in_connect.connect_seq));
+               pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n",
+                      ENTITY_NAME(con->peer_name),
+                      IPQUADPORT(con->peer_addr.ipaddr));
                reset_connection(con);
                prepare_write_connect_retry(con->msgr, con);
                prepare_read_connect(con);
@@ -1123,24 +1133,27 @@ no_data:
  */
 static void process_message(struct ceph_connection *con)
 {
+       struct ceph_msg *msg = con->in_msg;
+
+       con->in_msg = NULL;
+
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
-               con->peer_name = con->in_msg->hdr.src.name;
+               con->peer_name = msg->hdr.src.name;
 
        spin_lock(&con->out_queue_lock);
        con->in_seq++;
        spin_unlock(&con->out_queue_lock);
 
        dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
-            con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
-            ENTITY_NAME(con->in_msg->hdr.src.name),
-            le16_to_cpu(con->in_msg->hdr.type),
-            ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
-            le32_to_cpu(con->in_msg->hdr.front_len),
-            le32_to_cpu(con->in_msg->hdr.data_len),
+            msg, le64_to_cpu(msg->hdr.seq),
+            ENTITY_NAME(msg->hdr.src.name),
+            le16_to_cpu(msg->hdr.type),
+            ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
+            le32_to_cpu(msg->hdr.front_len),
+            le32_to_cpu(msg->hdr.data_len),
             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
-       con->ops->dispatch(con, con->in_msg);
-       con->in_msg = NULL;
+       con->ops->dispatch(con, msg);
        prepare_read_tag(con);
 }
 
@@ -1276,7 +1289,7 @@ more:
                ret = read_partial_connect(con);
                if (ret <= 0)
                        goto done;
-               if (process_connect(con) < 0) {
+               if (process_connect(con) < 0 || test_bit(WAIT, &con->state)) {
                        ret = -1;
                        goto out;
                }
@@ -1384,8 +1397,8 @@ bad_tag:
 static void queue_con(struct ceph_connection *con)
 {
        if (test_bit(WAIT, &con->state) ||
-           test_bit(CLOSED, &con->state)) {
-               dout("queue_con %p ignoring: WAIT|CLOSED\n",
+           test_bit(DEAD, &con->state)) {
+               dout("queue_con %p ignoring: WAIT|DEAD\n",
                     con);
                return;
        }
@@ -1424,15 +1437,14 @@ more:
 
        if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
                dout("con_work CLOSED\n");
-               ceph_con_destroy(con);
-               if (test_and_clear_bit(REOPEN, &con->state)) {
-                       /* reopen w/ new peer */
-                       dout("con_work REPOPEN\n");
-                       clear_bit(CLOSED, &con->state);
-                       goto more;
-               }
+               con_close_socket(con);
                goto done;
        }
+       if (test_and_clear_bit(REOPEN, &con->state)) {
+               /* reopen w/ new peer */
+               dout("con_work REOPEN\n");
+               con_close_socket(con);
+       }
        if (test_bit(WAIT, &con->state)) {   /* we are a zombie */
                dout("con_work WAIT\n");
                goto done;
@@ -1544,6 +1556,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
                dout("create ip not specified, initially INADDR_ANY\n");
                msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY);
                msgr->inst.addr.ipaddr.sin_port = htons(0);  /* any port */
+               msgr->inst.addr.nonce = get_random_int();
        }
        msgr->inst.addr.ipaddr.sin_family = AF_INET;
 
@@ -1773,7 +1786,6 @@ void ceph_msg_kfree(struct ceph_msg *m)
        else
                kfree(m->front.iov_base);
        kfree(m);
-       dout("msg_kfree %p done\n", m);
 }
 
 /*
index 051b9411bce7833a0828ae861e74fb7e326ef830..465999c5f6b25ab120d0210fd463589a0e9bb668 100644 (file)
@@ -135,6 +135,7 @@ struct ceph_msg_pos {
 #define SOCK_CLOSED    11 /* socket state changed to closed */
 #define REGISTERED      12 /* connection appears in con_tree */
 #define REOPEN          13 /* reopen connection w/ new peer */
+#define DEAD            14 /* dead, about to kfree */
 
 /*
  * A single connection with another host.
@@ -223,6 +224,8 @@ extern void ceph_con_destroy(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_keepalive(struct ceph_connection *con);
 extern void ceph_con_close(struct ceph_connection *con);
+extern void ceph_con_reopen(struct ceph_connection *con,
+                           struct ceph_entity_addr *addr);
 extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
 extern void ceph_con_put(struct ceph_connection *con);