]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rework get/add/replace connection msgr spinlocks
authorSage Weil <sage@newdream.net>
Wed, 16 Jan 2008 19:04:32 +0000 (11:04 -0800)
committerSage Weil <sage@newdream.net>
Wed, 16 Jan 2008 19:04:32 +0000 (11:04 -0800)
src/kernel/messenger.c

index 8a2af20ead6c7819990ea89c564b85b15251ced2..d7ef53d38ee4644513669cf9be7b12c25e8dcfcf 100644 (file)
@@ -153,14 +153,13 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr)
 /* 
  * get an existing connection, if any, for given addr
  */
-static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
 {
        struct ceph_connection *con = NULL;
        struct list_head *head, *p;
        unsigned long key = hash_addr(addr);
 
        /* existing? */
-       spin_lock(&msgr->con_lock);
        head = radix_tree_lookup(&msgr->con_open, key);
        if (head == NULL) 
                goto out;
@@ -181,10 +180,11 @@ static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struc
        }
        con = NULL;
 out:
-       spin_unlock(&msgr->con_lock);
        return con;
 }
 
+
+
 /* 
  * drop a reference
  */
@@ -203,7 +203,7 @@ static void put_connection(struct ceph_connection *con)
 /* 
  * add to connections tree
  */
-static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
 {
        struct list_head *head;
        unsigned long key = hash_addr(&con->peer_addr);
@@ -211,7 +211,6 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *
        /* inc ref count */
        atomic_inc(&con->nref);
 
-       spin_lock(&msgr->con_lock);
        /* PW this is bogus... needs to be readdressed later */
        if (test_bit(ACCEPTING, &con->state)) {
                list_del(&con->list_bucket);
@@ -229,7 +228,6 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *
                INIT_LIST_HEAD(&con->list_bucket); /* empty */
                radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
        }
-       spin_unlock(&msgr->con_lock);
 }
 
 static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con)
@@ -279,7 +277,7 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio
  * replace another connection
  *  (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
  */
-static void replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
 {
        spin_lock(&msgr->con_lock);
        list_add(&new->list_bucket, &old->list_bucket);
@@ -479,7 +477,7 @@ more:
                set_bit(CONNECTING, &con->state);
                dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
                ret = ceph_tcp_connect(con);
-               dout(30, "try_write returned from connect ret = %d state = %lu", ret, con->state);
+               dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
                if (ret < 0) {
                        /* fault */
                        derr(1, "try_write connect error\n");
@@ -782,19 +780,20 @@ static int read_accept_partial(struct ceph_connection *con)
 static void process_accept(struct ceph_connection *con)
 {
        struct ceph_connection *existing;
+       struct ceph_messenger *msgr = con->msgr;
 
        /* do we already have a connection for this peer? */
-       spin_lock(&con->msgr->con_lock);
-       existing = get_connection(con->msgr, &con->peer_addr);
+       spin_lock(&msgr->con_lock);
+       existing = __get_connection(msgr, &con->peer_addr);
        if (existing) {
                //spin_lock(&existing->lock);
                /* replace existing connection? */
                if ((test_bit(CONNECTING, &existing->state) && 
-                    ceph_entity_addr_equal(&con->msgr->inst.addr, &con->peer_addr)) ||
+                    ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) ||
                    (test_bit(OPEN, &existing->state) && 
                     con->connect_seq == existing->connect_seq)) {
                        /* replace existing with new connection */
-                       replace_connection(con->msgr, existing, con);
+                       __replace_connection(msgr, existing, con);
                        /* steal message queue */
                        list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
                        con->out_seq = existing->out_seq;
@@ -809,10 +808,10 @@ static void process_accept(struct ceph_connection *con)
                //spin_unlock(&existing->lock);
                put_connection(existing);
        } else {
-               add_connection(con->msgr, con);
+               __add_connection(msgr, con);
                set_bit(OPEN, &con->state);
        }
-       spin_unlock(&con->msgr->con_lock);
+       spin_unlock(&msgr->con_lock);
 
        /* the result? */
        clear_bit(ACCEPTING, &con->state);
@@ -1035,7 +1034,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        msg->hdr.src = msgr->inst;
 
        /* do we have the connection? */
-       con = get_connection(msgr, &msg->hdr.dst.addr);
+       spin_lock(&msgr->con_lock);
+       con = __get_connection(msgr, &msg->hdr.dst.addr);
        if (!con) {
                con = new_connection(msgr);
                if (IS_ERR(con))
@@ -1044,12 +1044,13 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), 
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
                con->peer_addr = msg->hdr.dst.addr;
-               add_connection(msgr, con);
+               __add_connection(msgr, con);
        } else {
                dout(10, "ceph_msg_send had connection %p to peer %x:%d\n", con,
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
-       }                    
+       }
+       spin_unlock(&msgr->con_lock);
        con->delay = timeout;
        dout(10, "ceph_msg_send delay = %lu\n", con->delay); 
        /* queue */