]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fixed various msgr bugs on incoming connections
authorSage Weil <sage@newdream.net>
Wed, 30 Apr 2008 00:58:16 +0000 (17:58 -0700)
committerSage Weil <sage@newdream.net>
Wed, 30 Apr 2008 00:58:16 +0000 (17:58 -0700)
src/TODO
src/kernel/client.c
src/kernel/messenger.c

index 175613780719f8405ccb6b0dc6b68383cefd470d..308d262bc3ebbc10363c4b1c9341747674d59a7d 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -62,6 +62,7 @@ pgmon
 - watch osd utilization; adjust overload in cluster map
 
 mon
+- use standby mds on mds failure.. not just mds boot
 - paxos need to clean up old states.
 - some sort of tester for PaxosService...
 - osdmon needs to lower-bound old osdmap versions it keeps around?
index 5aed7698ed27d6f0e46df207553825f859cb66dc..455a7e1dad9568e7e8c6cc849c67e735abde046e 100644 (file)
@@ -343,6 +343,7 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_PING: return "ping";
        case CEPH_MSG_PING_ACK: return "ping_ack";
        case CEPH_MSG_MON_MAP: return "mon_map";
+       case CEPH_MSG_MON_GET_MAP: return "mon_get_map";
        case CEPH_MSG_CLIENT_MOUNT: return "client_mount";
        case CEPH_MSG_CLIENT_UNMOUNT: return "client_unmount";
        case CEPH_MSG_STATFS: return "statfs";
index 1306249b66b542a242ced6b0459e729b24c3d549..207dd41e64014b8cbbe20230d9747780fbd152a0 100644 (file)
@@ -267,6 +267,10 @@ static void ceph_fault(struct ceph_connection *con)
        dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
             con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
 
+       ceph_debug_console = 1;
+       ceph_debug_msgr = 40;
+       ceph_debug = 40;
+
        /* PW if never get here remove */
        if (test_bit(WAIT, &con->state)) {
                derr(30, "fault socket close during WAIT state\n");
@@ -488,6 +492,11 @@ static void prepare_write_ack(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
+static void prepare_read_connect(struct ceph_connection *con)
+{
+       con->in_base_pos = 0;
+}
+
 static void prepare_write_connect(struct ceph_messenger *msgr,
                                  struct ceph_connection *con)
 {
@@ -502,6 +511,18 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        set_bit(WRITE_PENDING, &con->state);
 }
 
+static void prepare_write_connect_retry(struct ceph_messenger *msgr,
+                                       struct ceph_connection *con)
+{
+       con->out_connect_seq = cpu_to_le32(con->connect_seq);
+       con->out_kvec[0].iov_base = &con->out_connect_seq;
+       con->out_kvec[0].iov_len = 4;
+       con->out_kvec_left = 1;
+       con->out_kvec_bytes = 4;
+       con->out_kvec_cur = con->out_kvec;
+       set_bit(WRITE_PENDING, &con->state);
+}
+
 static void prepare_write_accept_announce(struct ceph_messenger *msgr,
                                          struct ceph_connection *con)
 {
@@ -566,6 +587,7 @@ more:
                if (test_and_clear_bit(STANDBY, &con->state))
                        con->connect_seq++;
                prepare_write_connect(msgr, con);
+               prepare_read_connect(con);
                set_bit(CONNECTING, &con->state);
                con->in_tag = CEPH_MSGR_TAG_READY;
                dout(5, "try_write initiating connect on %p new state %lu\n",
@@ -924,7 +946,7 @@ static void process_connect(struct ceph_connection *con)
                dout(10, "process_connect got RESET peer seq %u\n",
                     le32_to_cpu(con->in_connect_seq));
                reset_connection(con);
-               prepare_write_connect(con->msgr, con);
+               prepare_write_connect_retry(con->msgr, con);
                con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
                break;
        case CEPH_MSGR_TAG_RETRY:
@@ -933,7 +955,7 @@ static void process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect_seq),
                     le32_to_cpu(con->in_connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect_seq);
-               prepare_write_connect(con->msgr, con);
+               prepare_write_connect_retry(con->msgr, con);
                break;
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
@@ -1013,10 +1035,8 @@ static void __replace_connection(struct ceph_messenger *msgr,
        new->out_seq = old->out_seq;
 
        /* replace list entry */
-       spin_lock(&msgr->con_lock);
        list_add(&new->list_bucket, &old->list_bucket);
        list_del_init(&old->list_bucket);
-       spin_unlock(&msgr->con_lock);
 
        set_bit(CLOSED, &old->state);
        put_connection(old); /* dec reference count */
@@ -1091,7 +1111,7 @@ static void process_accept(struct ceph_connection *con)
                prepare_write_accept_reply(con, &tag_ready);
        }
        spin_unlock(&msgr->con_lock);
-       /* queue write */
+
        ceph_queue_write(con);
        put_connection(con);
 }
@@ -1219,6 +1239,12 @@ static void try_accept(struct work_struct *work)
                derr(1, "kmalloc failure accepting new connection\n");
                goto done;
        }
+
+       new_con->connect_seq = 1;
+       set_bit(ACCEPTING, &new_con->state);
+       clear_bit(NEW, &new_con->state);
+       new_con->in_tag = CEPH_MSGR_TAG_READY;  /* eventually, hopefully */
+
        if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
                derr(1, "error accepting connection\n");
                put_connection(new_con);
@@ -1226,9 +1252,6 @@ static void try_accept(struct work_struct *work)
        }
        dout(5, "accepted connection \n");
 
-       new_con->connect_seq = 1;
-       set_bit(ACCEPTING, &new_con->state);
-       clear_bit(NEW, &new_con->state);
        prepare_write_accept_announce(msgr, new_con);
        add_connection_accepting(msgr, new_con);