From: Sage Weil Date: Wed, 28 May 2008 03:49:27 +0000 (-0700) Subject: kclient: fixed msgr connection refcounting, and some cleanups X-Git-Tag: v0.3~177 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c684e1c915d6cfdcb7e12abf7d8f5a1e14b6bbe3;p=ceph.git kclient: fixed msgr connection refcounting, and some cleanups --- diff --git a/src/kernel/ktcp.c b/src/kernel/ktcp.c index 046861d94df7..70ecc50245fd 100644 --- a/src/kernel/ktcp.c +++ b/src/kernel/ktcp.c @@ -34,30 +34,35 @@ struct kobj_type ceph_socket_type = { struct ceph_socket *ceph_socket_create() { struct ceph_socket *s; - int err; - + int err = -ENOMEM; + s = kzalloc(sizeof(*s), GFP_NOFS); - if (!s) { - derr(10, "ENOMEM creating ceph_socket\n"); - return ERR_PTR(-ENOMEM); - } + if (!s) + goto out; err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock); - if (err) { - derr(10, "sock_create_kern error %d\n", err); - return ERR_PTR(err); - } + if (err) + goto out_free; #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25) - kobject_init_and_add(&s->kobj, &ceph_socket_type, - ceph_sockets_kobj, - "socket %p", s); + err = kobject_init_and_add(&s->kobj, &ceph_socket_type, + ceph_sockets_kobj, + "socket %p", s); + if (err) + goto out_release; #else kobject_init(&s->kobj); kobject_set_name(&s->kobj, "socket %p", s); s->kobj.ktype = &ceph_socket_type; #endif return s; + +out_release: + sock_release(s->sock); +out_free: + kfree(s); +out: + return ERR_PTR(err); } void ceph_socket_get(struct ceph_socket *s) @@ -69,13 +74,11 @@ void ceph_socket_get(struct ceph_socket *s) BUG_ON(!r); } -void ceph_socket_put(struct ceph_socket *s, int die) +void ceph_socket_put(struct ceph_socket *s) { dout(10, "socket_put %p\n", s); if (!s) return; - if (die) - ceph_cancel_sock_callbacks(s); kobject_put(&s->kobj); } @@ -216,7 +219,8 @@ struct ceph_socket *ceph_tcp_connect(struct ceph_connection *con) /* TBD check for fatal errors, retry if not fatal.. */ derr(1, "connect %u.%u.%u.%u:%u error: %d\n", IPQUADPORT(*(struct sockaddr_in *)paddr), ret); - ceph_socket_put(s, 1); + ceph_cancel_sock_callbacks(s); + ceph_socket_put(s); con->s = 0; } @@ -287,7 +291,7 @@ int ceph_tcp_listen(struct ceph_messenger *msgr) return ret; err: - ceph_socket_put(s, 0); + ceph_socket_put(s); return ret; } @@ -327,7 +331,7 @@ int ceph_tcp_accept(struct ceph_socket *ls, struct ceph_connection *con) return ret; err: - ceph_socket_put(s, 0); + ceph_socket_put(s); con->s = 0; return ret; } diff --git a/src/kernel/ktcp.h b/src/kernel/ktcp.h index 840c8d5a8865..59314f37255e 100644 --- a/src/kernel/ktcp.h +++ b/src/kernel/ktcp.h @@ -24,7 +24,7 @@ void ceph_workqueue_shutdown(void); extern struct ceph_socket *ceph_socket_create(void); extern void ceph_socket_get(struct ceph_socket *s); -extern void ceph_socket_put(struct ceph_socket *s, int die); +extern void ceph_socket_put(struct ceph_socket *s); /* Max number of outstanding connections in listener queueu */ #define NUM_BACKUP 10 diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index bd32cd321b2f..ea7d33b54b72 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -90,21 +90,21 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, /* existing? */ head = radix_tree_lookup(&msgr->con_tree, key); if (head == NULL) - goto out; + return NULL; con = list_entry(head, struct ceph_connection, list_bucket); - if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) { - atomic_inc(&con->nref); - goto out; - } + if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) + goto yes; list_for_each(p, head) { con = list_entry(p, struct ceph_connection, list_bucket); - if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) { - atomic_inc(&con->nref); - goto out; - } + if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) + goto yes; } - con = NULL; -out: + return NULL; + +yes: + atomic_inc(&con->nref); + dout(20, "get_connection %p nref = %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); return con; } @@ -115,12 +115,14 @@ out: */ static void put_connection(struct ceph_connection *con) { - dout(20, "put_connection nref = %d\n", atomic_read(&con->nref)); + dout(20, "put_connection %p nref = %d -> %d\n", con, + atomic_read(&con->nref), atomic_read(&con->nref) - 1); if (atomic_dec_and_test(&con->nref)) { - dout(20, "put_connection destroying %p\n", con); + dout(20, "put_connection %p destroying\n", con); ceph_msg_put_list(&con->out_queue); ceph_msg_put_list(&con->out_sent); - ceph_socket_put(con->s, 1); + ceph_cancel_sock_callbacks(con->s); + ceph_socket_put(con->s); kfree(con); } } @@ -136,6 +138,8 @@ static void __register_connection(struct ceph_messenger *msgr, /* inc ref count */ atomic_inc(&con->nref); + dout(20, "add_connection %p %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); if (test_and_clear_bit(ACCEPTING, &con->state)) { list_del_init(&con->list_bucket); @@ -161,6 +165,8 @@ static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con) { atomic_inc(&con->nref); + dout(20, "add_connection_accepting %p nref = %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); spin_lock(&msgr->con_lock); list_add(&con->list_all, &msgr->con_all); spin_unlock(&msgr->con_lock); @@ -225,8 +231,9 @@ static void remove_connection(struct ceph_messenger *msgr, */ void ceph_queue_write(struct ceph_connection *con) { - dout(40, "ceph_queue_write %p\n", con); atomic_inc(&con->nref); + dout(40, "ceph_queue_write %p %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); if (!queue_work(send_wq, &con->swork.work)) { dout(40, "ceph_queue_write %p - already queued\n", con); put_connection(con); @@ -235,8 +242,10 @@ void ceph_queue_write(struct ceph_connection *con) void ceph_queue_delayed_write(struct ceph_connection *con) { - dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay); atomic_inc(&con->nref); + dout(40, "ceph_queue_delayed_write %p delay %lu %d -> %d\n", con, + con->delay, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); if (!queue_delayed_work(send_wq, &con->swork, round_jiffies_relative(con->delay))) { dout(40, "ceph_queue_delayed_write %p - already queued\n", con); @@ -250,8 +259,9 @@ void ceph_queue_read(struct ceph_connection *con) dout(40, "ceph_queue_read %p - already READABLE\n", con); return; } - dout(40, "ceph_queue_read %p\n", con); atomic_inc(&con->nref); + dout(40, "ceph_queue_read %p %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); queue_work(recv_wq, &con->rwork); } @@ -277,7 +287,8 @@ static void ceph_fault(struct ceph_connection *con) if (con->delay) { dout(30, "fault tcp_close delay != 0\n"); - ceph_socket_put(con->s, 1); + ceph_cancel_sock_callbacks(con->s); + ceph_socket_put(con->s); con->s = NULL; set_bit(NEW, &con->state); @@ -664,7 +675,7 @@ more_kvec: done: dout(30, "try_write done on %p\n", con); - ceph_socket_put(s, 0); + ceph_socket_put(s); put_connection(con); return; } @@ -994,7 +1005,8 @@ static void process_connect(struct ceph_connection *con) case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); set_bit(WAIT, &con->state); - ceph_socket_put(con->s, 1); + ceph_cancel_sock_callbacks(con->s); + ceph_socket_put(con->s); con->s = NULL; break; case CEPH_MSGR_TAG_READY: @@ -1269,7 +1281,7 @@ done: out: dout(20, "try_read done on %p\n", con); - ceph_socket_put(s, 0); + ceph_socket_put(s); put_connection(con); return; } @@ -1374,7 +1386,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout(2, "destroy %p\n", msgr); /* stop listener */ - ceph_socket_put(msgr->listen_s, 1); + ceph_cancel_sock_callbacks(msgr->listen_s); + ceph_socket_put(msgr->listen_s); cancel_work_sync(&msgr->awork); /* kill off connections */ @@ -1385,13 +1398,17 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout(10, "destroy removing connection %p\n", con); set_bit(CLOSED, &con->state); atomic_inc(&con->nref); + dout(40, " get %p %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); __remove_connection(msgr, con); /* in case there's queued work... */ spin_unlock(&msgr->con_lock); ceph_cancel_sock_callbacks(con->s); - cancel_work_sync(&con->rwork); - cancel_delayed_work_sync(&con->swork); + if (cancel_work_sync(&con->rwork)) + put_connection(con); + if (cancel_delayed_work_sync(&con->swork)) + put_connection(con); put_connection(con); dout(10, "destroy removed connection %p\n", con);