From c8bc1c36798be963a3e14dcb6ec3b9a2a5119b5e Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 28 Nov 2007 17:43:45 +0000 Subject: [PATCH] init workqueues once; fixed get_connection git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2136 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/client.c | 46 ++++++++++++++++++++++++++++------- trunk/ceph/kernel/ktcp.c | 6 ++--- trunk/ceph/kernel/messenger.c | 29 ++++++++++++++++------ 3 files changed, 61 insertions(+), 20 deletions(-) diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index b1278692c0b73..c5d9eb9298eaf 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -5,11 +5,45 @@ #include #include "client.h" #include "super.h" +#include "ktcp.h" /* debug level; defined in include/ceph_fs.h */ int ceph_debug = 20; +/* + * directory of filesystems mounted by this host + * + * key: fsid.major ^ fsid.minor + * value: struct ceph_client.fsid_item + */ +static spinlock_t ceph_client_spinlock = SPIN_LOCK_UNLOCKED; +static int ceph_num_clients = 0; + +RADIX_TREE(ceph_clients, GFP_KERNEL); + +static void get_client_counter(void) +{ + spin_lock(&ceph_client_spinlock); + if (ceph_num_clients == 0) { + dout(1, "first client, setting up workqueues\n"); + ceph_workqueue_init(); + } + ceph_num_clients++; + spin_unlock(&ceph_client_spinlock); +} + +static void put_client_counter(void) +{ + spin_lock(&ceph_client_spinlock); + ceph_num_clients--; + if (ceph_num_clients == 0) { + dout(1, "last client, shutting down workqueues\n"); + ceph_workqueue_shutdown(); + } + spin_unlock(&ceph_client_spinlock); +} + void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg); @@ -29,6 +63,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args) atomic_set(&cl->nref, 0); init_waitqueue_head(&cl->mount_wq); spin_lock_init(&cl->sb_lock); + get_client_counter(); /* messenger */ cl->msgr = ceph_messenger_create(); @@ -47,6 +82,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args) return cl; fail: + put_client_counter(); kfree(cl); return ERR_PTR(err); } @@ -133,15 +169,6 @@ static void handle_mon_map(struct ceph_client *client, struct ceph_msg *msg) /* - * directory of filesystems mounted by this host - * - * key: fsid.major ^ fsid.minor - * value: struct ceph_client.fsid_item - */ - -/* ignore all this until later -RADIX_TREE(ceph_clients, GFP_KERNEL); - static struct ceph_client *get_client_fsid(struct ceph_fsid *fsid) { @@ -190,6 +217,7 @@ void ceph_put_client(struct ceph_client *cl) /* unmount */ /* ... */ + put_client_counter(); kfree(cl); } } diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index 2c6692617e068..4dd6c3235ae0f 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -59,7 +59,7 @@ static int set_sock_callbacks(struct socket *sock, void *user_data) { struct sock *sk = sock->sk; sk->sk_user_data = user_data; - printk(KERN_INFO "Entered set_sock_callbacks\n"); + dout(20, "set_sock_callbacks\n"); /* Install callbacks */ sk->sk_data_ready = ceph_data_ready; @@ -80,7 +80,7 @@ int ceph_tcp_connect(struct ceph_connection *con) ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock); if (ret < 0) { - printk(KERN_INFO "sock_create_kern error: %d\n", ret); + derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret); goto done; } @@ -92,7 +92,7 @@ int ceph_tcp_connect(struct ceph_connection *con) if (ret == -EINPROGRESS) return 0; if (ret < 0) { /* TBD check for fatal errors, retry if not fatal.. */ - printk(KERN_INFO "kernel_connect error: %d\n", ret); + derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret); sock_release(con->sock); con->sock = NULL; } diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 7a41ca07e9e27..00d789e19f378 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -93,14 +93,22 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr) */ static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) { - struct ceph_connection *con; + struct ceph_connection *con = NULL; struct list_head *head, *p; unsigned long key = hash_addr(addr); /* existing? */ spin_lock(&msgr->con_lock); head = radix_tree_lookup(&msgr->con_open, key); - if (head) { + if (head == NULL) + goto out; + if (list_empty(head)) { + con = list_entry(head, struct ceph_connection, list_bucket); + if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) { + atomic_inc(&con->nref); + goto out; + } + } else { list_for_each(p, head) { con = list_entry(p, struct ceph_connection, list_bucket); if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) { @@ -121,6 +129,7 @@ out: static void put_connection(struct ceph_connection *con) { if (atomic_dec_and_test(&con->nref)) { + dout(20, "put_connection destroying %p\n", con); sock_release(con->sock); kfree(con); } @@ -140,8 +149,10 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection * spin_lock(&msgr->con_lock); head = radix_tree_lookup(&msgr->con_open, key); if (head) { + dout(20, "add_connection %p in existing bucket %lu\n", con, key); list_add(&con->list_bucket, head); } else { + dout(20, "add_connection %p in new bucket %lu\n", con, key); INIT_LIST_HEAD(&con->list_bucket); /* empty */ radix_tree_insert(&msgr->con_open, key, &con->list_bucket); } @@ -172,8 +183,10 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio /* remove from con_open too */ if (list_empty(&con->list_bucket)) { /* last one */ + dout(20, "remove_connection %p and removing bucket %lu\n", con, key); radix_tree_delete(&msgr->con_open, key); } else { + dout(20, "remove_connection %p from bucket %lu\n", con, key); list_del(&con->list_bucket); } } @@ -742,13 +755,13 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) con = new_connection(msgr); if (IS_ERR(con)) return PTR_ERR(con); - dout(5, "opening new connection to peer %x:%d\n", + dout(5, "ceph_msg_send opening new connection %p to peer %x:%d\n", con, ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); con->peer_addr = msg->hdr.dst.addr; add_connection(msgr, con); } else { - dout(5, "had connection to peer %x:%d\n", + dout(5, "ceph_msg_send had connection %p to peer %x:%d\n", con, ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); } @@ -757,21 +770,21 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) /* initiate connect? */ if (test_bit(NEW, &con->state)) { + dout(5, "ceph_msg_send initiating connect on %p\n", con); ret = ceph_tcp_connect(con); if (ret < 0){ derr(1, "connection failure to peer %x:%d\n", ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); remove_connection(msgr, con); - kfree(con); + put_connection(con); return(ret); - } } /* queue */ - dout(1, "queuing outgoing message for %s%d\n", - ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num); + dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n", + ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con); ceph_msg_get(msg); list_add(&msg->list_head, &con->out_queue); -- 2.39.5