]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fixed msgr connection refcounting, and some cleanups
authorSage Weil <sage@newdream.net>
Wed, 28 May 2008 03:49:27 +0000 (20:49 -0700)
committerSage Weil <sage@newdream.net>
Wed, 28 May 2008 03:49:27 +0000 (20:49 -0700)
src/kernel/ktcp.c
src/kernel/ktcp.h
src/kernel/messenger.c

index 046861d94df7052eb37e3ed4e2be85d83cbb409c..70ecc50245fd79f758fad824dcae66c7aa9c91f5 100644 (file)
@@ -34,30 +34,35 @@ struct kobj_type ceph_socket_type = {
 struct ceph_socket *ceph_socket_create()
 {
        struct ceph_socket *s;
-       int err;
-       
+       int err = -ENOMEM;
+
        s = kzalloc(sizeof(*s), GFP_NOFS);
-       if (!s) {
-               derr(10, "ENOMEM creating ceph_socket\n");
-               return ERR_PTR(-ENOMEM);
-       }
+       if (!s)
+               goto out;
 
        err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock);
-       if (err) {
-               derr(10, "sock_create_kern error %d\n", err);
-               return ERR_PTR(err);
-       }
+       if (err)
+               goto out_free;
 
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
-       kobject_init_and_add(&s->kobj, &ceph_socket_type,
-                            ceph_sockets_kobj,
-                            "socket %p", s);
+       err = kobject_init_and_add(&s->kobj, &ceph_socket_type,
+                                  ceph_sockets_kobj,
+                                  "socket %p", s);
+       if (err)
+               goto out_release;
 #else
        kobject_init(&s->kobj);
        kobject_set_name(&s->kobj, "socket %p", s);
        s->kobj.ktype = &ceph_socket_type;
 #endif
        return s;
+
+out_release:
+       sock_release(s->sock);
+out_free:
+       kfree(s);
+out:
+       return ERR_PTR(err);
 }
 
 void ceph_socket_get(struct ceph_socket *s)
@@ -69,13 +74,11 @@ void ceph_socket_get(struct ceph_socket *s)
        BUG_ON(!r);
 }
 
-void ceph_socket_put(struct ceph_socket *s, int die)
+void ceph_socket_put(struct ceph_socket *s)
 {
        dout(10, "socket_put %p\n", s);
        if (!s)
                return;
-       if (die)
-               ceph_cancel_sock_callbacks(s);
        kobject_put(&s->kobj);
 }
 
@@ -216,7 +219,8 @@ struct ceph_socket *ceph_tcp_connect(struct ceph_connection *con)
                /* TBD check for fatal errors, retry if not fatal.. */
                derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
                     IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
-               ceph_socket_put(s, 1);
+               ceph_cancel_sock_callbacks(s);
+               ceph_socket_put(s);
                con->s = 0;
        }
 
@@ -287,7 +291,7 @@ int ceph_tcp_listen(struct ceph_messenger *msgr)
        return ret;
 
 err:
-       ceph_socket_put(s, 0);
+       ceph_socket_put(s);
        return ret;
 }
 
@@ -327,7 +331,7 @@ int ceph_tcp_accept(struct ceph_socket *ls, struct ceph_connection *con)
        return ret;
 
 err:
-       ceph_socket_put(s, 0);
+       ceph_socket_put(s);
        con->s = 0;
        return ret;
 }
index 840c8d5a8865b8e7dbc48f581e4e224371ae02b3..59314f37255e648ddf99d23018a8a01c2c539237 100644 (file)
@@ -24,7 +24,7 @@ void ceph_workqueue_shutdown(void);
 
 extern struct ceph_socket *ceph_socket_create(void);
 extern void ceph_socket_get(struct ceph_socket *s);
-extern void ceph_socket_put(struct ceph_socket *s, int die);
+extern void ceph_socket_put(struct ceph_socket *s);
 
 /* Max number of outstanding connections in listener queueu */
 #define NUM_BACKUP 10
index bd32cd321b2f6366e9f14442fe29627201ceb7e5..ea7d33b54b726764acbb9cb6afaa9bfdab88e1b9 100644 (file)
@@ -90,21 +90,21 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
        /* existing? */
        head = radix_tree_lookup(&msgr->con_tree, key);
        if (head == NULL)
-               goto out;
+               return NULL;
        con = list_entry(head, struct ceph_connection, list_bucket);
-       if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
-               atomic_inc(&con->nref);
-               goto out;
-       }
+       if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
+               goto yes;
        list_for_each(p, head) {
                con = list_entry(p, struct ceph_connection, list_bucket);
-               if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
-                       atomic_inc(&con->nref);
-                       goto out;
-               }
+               if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
+                       goto yes;
        }
-       con = NULL;
-out:
+       return NULL;
+
+yes:
+       atomic_inc(&con->nref);
+       dout(20, "get_connection %p nref = %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        return con;
 }
 
@@ -115,12 +115,14 @@ out:
  */
 static void put_connection(struct ceph_connection *con)
 {
-       dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
+       dout(20, "put_connection %p nref = %d -> %d\n", con,
+            atomic_read(&con->nref), atomic_read(&con->nref) - 1);
        if (atomic_dec_and_test(&con->nref)) {
-               dout(20, "put_connection destroying %p\n", con);
+               dout(20, "put_connection %p destroying\n", con);
                ceph_msg_put_list(&con->out_queue);
                ceph_msg_put_list(&con->out_sent);
-               ceph_socket_put(con->s, 1);
+               ceph_cancel_sock_callbacks(con->s);
+               ceph_socket_put(con->s);
                kfree(con);
        }
 }
@@ -136,6 +138,8 @@ static void __register_connection(struct ceph_messenger *msgr,
 
        /* inc ref count */
        atomic_inc(&con->nref);
+       dout(20, "add_connection %p %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
 
        if (test_and_clear_bit(ACCEPTING, &con->state)) {
                list_del_init(&con->list_bucket);
@@ -161,6 +165,8 @@ static void add_connection_accepting(struct ceph_messenger *msgr,
                                     struct ceph_connection *con)
 {
        atomic_inc(&con->nref);
+       dout(20, "add_connection_accepting %p nref = %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        spin_lock(&msgr->con_lock);
        list_add(&con->list_all, &msgr->con_all);
        spin_unlock(&msgr->con_lock);
@@ -225,8 +231,9 @@ static void remove_connection(struct ceph_messenger *msgr,
  */
 void ceph_queue_write(struct ceph_connection *con)
 {
-       dout(40, "ceph_queue_write %p\n", con);
        atomic_inc(&con->nref);
+       dout(40, "ceph_queue_write %p %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        if (!queue_work(send_wq, &con->swork.work)) {
                dout(40, "ceph_queue_write %p - already queued\n", con);
                put_connection(con);
@@ -235,8 +242,10 @@ void ceph_queue_write(struct ceph_connection *con)
 
 void ceph_queue_delayed_write(struct ceph_connection *con)
 {
-       dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay);
        atomic_inc(&con->nref);
+       dout(40, "ceph_queue_delayed_write %p delay %lu %d -> %d\n", con,
+            con->delay,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        if (!queue_delayed_work(send_wq, &con->swork,
                                round_jiffies_relative(con->delay))) {
                dout(40, "ceph_queue_delayed_write %p - already queued\n", con);
@@ -250,8 +259,9 @@ void ceph_queue_read(struct ceph_connection *con)
                dout(40, "ceph_queue_read %p - already READABLE\n", con);
                return;
        }
-       dout(40, "ceph_queue_read %p\n", con);
        atomic_inc(&con->nref);
+       dout(40, "ceph_queue_read %p %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        queue_work(recv_wq, &con->rwork);
 }
 
@@ -277,7 +287,8 @@ static void ceph_fault(struct ceph_connection *con)
 
        if (con->delay) {
                dout(30, "fault tcp_close delay != 0\n");
-               ceph_socket_put(con->s, 1);
+               ceph_cancel_sock_callbacks(con->s);
+               ceph_socket_put(con->s);
                con->s = NULL;
                set_bit(NEW, &con->state);
 
@@ -664,7 +675,7 @@ more_kvec:
 
 done:
        dout(30, "try_write done on %p\n", con);
-       ceph_socket_put(s, 0);
+       ceph_socket_put(s);
        put_connection(con);
        return;
 }
@@ -994,7 +1005,8 @@ static void process_connect(struct ceph_connection *con)
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
                set_bit(WAIT, &con->state);
-               ceph_socket_put(con->s, 1);
+               ceph_cancel_sock_callbacks(con->s);
+               ceph_socket_put(con->s);
                con->s = NULL;
                break;
        case CEPH_MSGR_TAG_READY:
@@ -1269,7 +1281,7 @@ done:
 
 out:
        dout(20, "try_read done on %p\n", con);
-       ceph_socket_put(s, 0);
+       ceph_socket_put(s);
        put_connection(con);
        return;
 }
@@ -1374,7 +1386,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        dout(2, "destroy %p\n", msgr);
        
        /* stop listener */
-       ceph_socket_put(msgr->listen_s, 1);
+       ceph_cancel_sock_callbacks(msgr->listen_s);
+       ceph_socket_put(msgr->listen_s);
        cancel_work_sync(&msgr->awork);
 
        /* kill off connections */
@@ -1385,13 +1398,17 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
                dout(10, "destroy removing connection %p\n", con);
                set_bit(CLOSED, &con->state);
                atomic_inc(&con->nref);
+               dout(40, " get %p %d -> %d\n", con,
+                    atomic_read(&con->nref) - 1, atomic_read(&con->nref));
                __remove_connection(msgr, con);
                
                /* in case there's queued work... */
                spin_unlock(&msgr->con_lock);
                ceph_cancel_sock_callbacks(con->s);
-               cancel_work_sync(&con->rwork);
-               cancel_delayed_work_sync(&con->swork);
+               if (cancel_work_sync(&con->rwork))
+                       put_connection(con);
+               if (cancel_delayed_work_sync(&con->swork))
+                       put_connection(con);
                put_connection(con);
                dout(10, "destroy removed connection %p\n", con);