]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: some msgr cleanups
authorSage Weil <sage@newdream.net>
Thu, 24 Apr 2008 17:28:42 +0000 (10:28 -0700)
committerSage Weil <sage@newdream.net>
Thu, 24 Apr 2008 17:28:42 +0000 (10:28 -0700)
src/kernel/messenger.c
src/kernel/messenger.h

index 995a893ca0c4cc8c8bdf012ff394252984ced209..ce7e940fb5f9e13da074940f773696f22d4d5608 100644 (file)
@@ -86,7 +86,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
        unsigned long key = hash_addr(addr);
 
        /* existing? */
-       head = radix_tree_lookup(&msgr->con_open, key);
+       head = radix_tree_lookup(&msgr->con_tree, key);
        if (head == NULL)
                goto out;
        con = list_entry(head, struct ceph_connection, list_bucket);
@@ -126,8 +126,8 @@ static void put_connection(struct ceph_connection *con)
 /*
  * add to connections tree
  */
-static void __add_connection(struct ceph_messenger *msgr,
-                            struct ceph_connection *con)
+static void __register_connection(struct ceph_messenger *msgr,
+                                 struct ceph_connection *con)
 {
        struct list_head *head;
        unsigned long key = hash_addr(&con->peer_addr);
@@ -138,11 +138,10 @@ static void __add_connection(struct ceph_messenger *msgr,
        if (test_and_clear_bit(ACCEPTING, &con->state)) {
                list_del_init(&con->list_bucket);
                put_connection(con);
-       } else {
+       } else
                list_add(&con->list_all, &msgr->con_all);
-       }
 
-       head = radix_tree_lookup(&msgr->con_open, key);
+       head = radix_tree_lookup(&msgr->con_tree, key);
        if (head) {
                dout(20, "add_connection %p in existing bucket %lu head %p\n",
                     con, key, head);
@@ -151,8 +150,9 @@ static void __add_connection(struct ceph_messenger *msgr,
                dout(20, "add_connection %p in new bucket %lu head %p\n", con,
                     key, &con->list_bucket);
                INIT_LIST_HEAD(&con->list_bucket); /* empty */
-               radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
+               radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
        }
+       set_bit(REGISTERED, &con->state);
 }
 
 static void add_connection_accepting(struct ceph_messenger *msgr,
@@ -161,13 +161,12 @@ static void add_connection_accepting(struct ceph_messenger *msgr,
        atomic_inc(&con->nref);
        spin_lock(&msgr->con_lock);
        list_add(&con->list_all, &msgr->con_all);
-       list_add(&con->list_bucket, &msgr->con_accepting);
        spin_unlock(&msgr->con_lock);
 }
 
 /*
  * remove connection from all list.
- * also, from con_open radix tree, if it should have been there
+ * also, from con_tree radix tree, if it should have been there
  */
 static void __remove_connection(struct ceph_messenger *msgr,
                                struct ceph_connection *con)
@@ -181,17 +180,16 @@ static void __remove_connection(struct ceph_messenger *msgr,
                return;
        }
        list_del_init(&con->list_all);
-       if (test_bit(CONNECTING, &con->state) ||
-           test_bit(OPEN, &con->state)) {
-               /* remove from con_open too */
+       if (test_bit(REGISTERED, &con->state)) {
+               /* remove from con_tree too */
                key = hash_addr(&con->peer_addr);
                if (list_empty(&con->list_bucket)) {
                        /* last one */
                        dout(20, "__remove_connection %p and bucket %lu\n",
                             con, key);
-                       radix_tree_delete(&msgr->con_open, key);
+                       radix_tree_delete(&msgr->con_tree, key);
                } else {
-                       slot = radix_tree_lookup_slot(&msgr->con_open, key);
+                       slot = radix_tree_lookup_slot(&msgr->con_tree, key);
                        val = radix_tree_deref_slot(slot);
                        dout(20, "__remove_connection %p from bucket %lu "
                             "head %p\n", con, key, val);
@@ -205,6 +203,8 @@ static void __remove_connection(struct ceph_messenger *msgr,
                        list_del_init(&con->list_bucket);
                }
        }
+       if (test_bit(ACCEPTING, &con->state))
+               list_del_init(&con->list_bucket);
        put_connection(con);
 }
 
@@ -216,6 +216,7 @@ static void remove_connection(struct ceph_messenger *msgr,
        spin_unlock(&msgr->con_lock);
 }
 
+
 /*
  * atomically queue read or write work on a connection.
  * bump reference to avoid races.
@@ -259,7 +260,7 @@ void ceph_queue_read(struct ceph_connection *con)
  * failure case
  * A retry mechanism is used with exponential backoff
  */
-static void ceph_send_fault(struct ceph_connection *con)
+static void ceph_fault(struct ceph_connection *con)
 {
        derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name),
             IPQUADPORT(con->peer_addr.ipaddr), con->error_msg);
@@ -532,7 +533,7 @@ static void try_write(struct work_struct *work)
        }
 
        if (test_and_clear_bit(SOCK_CLOSE, &con->state)) {
-               ceph_send_fault(con);
+               ceph_fault(con);
                goto done;
        }
 more:
@@ -548,8 +549,8 @@ more:
                     con, con->state);
                ret = ceph_tcp_connect(con);
                if (ret < 0) {
-                       derr(1, "try_write tcp connect error %d\n", ret);
-                       remove_connection(msgr, con);
+                       con->error_msg = "connect error";
+                       ceph_fault(con);
                        goto done;
                }
        }
@@ -884,7 +885,7 @@ static void process_connect(struct ceph_connection *con)
                derr(1, "process_connect protocol error, will retry\n");
                con->delay = BASE_DELAY_INTERVAL;
                con->error_msg = "protocol error";
-               ceph_send_fault(con);
+               ceph_fault(con);
        }
        if (test_bit(WRITE_PENDING, &con->state))
                ceph_queue_write(con);
@@ -1019,7 +1020,7 @@ static void process_accept(struct ceph_connection *con)
                prepare_write_accept_reply(con, &tag_reset);
        } else {
                dout(20, "process_accept no existing connection, opening\n");
-               __add_connection(msgr, con);
+               __register_connection(msgr, con);
                set_bit(OPEN, &con->state);
                con->connect_seq = peer_cseq + 1;
                prepare_write_accept_reply(con, &tag_ready);
@@ -1212,7 +1213,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        spin_lock_init(&msgr->con_lock);
        INIT_LIST_HEAD(&msgr->con_all);
        INIT_LIST_HEAD(&msgr->con_accepting);
-       INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC);
+       INIT_RADIX_TREE(&msgr->con_tree, GFP_KERNEL);
 
        /* pick listening address */
        if (myaddr) {
@@ -1348,7 +1349,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                        con = newcon;
                        con->peer_addr = msg->hdr.dst.addr;
                        con->peer_name = msg->hdr.dst.name;
-                       __add_connection(msgr, con);
+                       __register_connection(msgr, con);
                        dout(5, "ceph_msg_send new connection %p to peer "
                             "%u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));
index dc7acfe0de601e97aa08cc965f700f2a755a7a09..e602a79e19085744f55a7e574a7e47adbeb9193d 100644 (file)
@@ -42,8 +42,8 @@ struct ceph_messenger {
        struct work_struct awork;        /* accept work */
        spinlock_t con_lock;
        struct list_head con_all;        /* all connections */
-       struct list_head con_accepting;  /*  doing handshake, or */
-       struct radix_tree_root con_open; /*  established */
+       struct list_head con_accepting;  /* accepting */
+       struct radix_tree_root con_tree; /*  established */
 };
 
 struct ceph_msg {
@@ -76,6 +76,7 @@ struct ceph_msg_pos {
 #define CLOSED         8  /* we've closed the connection */
 #define SOCK_CLOSE     9  /* socket state changed to close */
 #define STANDBY                10 /* standby, when socket state close, no messages */
+#define REGISTERED      11
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
@@ -86,7 +87,7 @@ struct ceph_connection {
        atomic_t nref;
 
        struct list_head list_all;   /* msgr->con_all */
-       struct list_head list_bucket;  /* msgr->con_open or con_accepting */
+       struct list_head list_bucket;  /* msgr->con_tree or con_accepting */
 
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */