]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
init workqueues once; fixed get_connection
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 28 Nov 2007 17:43:45 +0000 (17:43 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 28 Nov 2007 17:43:45 +0000 (17:43 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2136 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/client.c
trunk/ceph/kernel/ktcp.c
trunk/ceph/kernel/messenger.c

index b1278692c0b73dd063d61f424631bd1aafa4a627..c5d9eb9298eaf079084473b1ba8e35bc7f12e326 100644 (file)
@@ -5,11 +5,45 @@
 #include <linux/random.h>
 #include "client.h"
 #include "super.h"
+#include "ktcp.h"
 
 
 /* debug level; defined in include/ceph_fs.h */
 int ceph_debug = 20;
 
+/*
+ * directory of filesystems mounted by this host
+ *
+ *   key: fsid.major ^ fsid.minor
+ * value: struct ceph_client.fsid_item
+ */
+static spinlock_t ceph_client_spinlock = SPIN_LOCK_UNLOCKED;
+static int ceph_num_clients = 0;
+
+RADIX_TREE(ceph_clients, GFP_KERNEL);
+
+static void get_client_counter(void) 
+{
+       spin_lock(&ceph_client_spinlock);
+       if (ceph_num_clients == 0) {
+               dout(1, "first client, setting up workqueues\n");
+               ceph_workqueue_init();
+       }
+       ceph_num_clients++;
+       spin_unlock(&ceph_client_spinlock);
+}
+
+static void put_client_counter(void) 
+{
+       spin_lock(&ceph_client_spinlock);
+       ceph_num_clients--;
+       if (ceph_num_clients == 0) {
+               dout(1, "last client, shutting down workqueues\n");
+               ceph_workqueue_shutdown();
+       }
+       spin_unlock(&ceph_client_spinlock);
+}
+
 
 void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg);
 
@@ -29,6 +63,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args)
        atomic_set(&cl->nref, 0);
        init_waitqueue_head(&cl->mount_wq);
        spin_lock_init(&cl->sb_lock);
+       get_client_counter();
 
        /* messenger */
        cl->msgr = ceph_messenger_create();
@@ -47,6 +82,7 @@ static struct ceph_client *create_client(struct ceph_mount_args *args)
        return cl;
 
 fail:
+       put_client_counter();
        kfree(cl);
        return ERR_PTR(err);
 }
@@ -133,15 +169,6 @@ static void handle_mon_map(struct ceph_client *client, struct ceph_msg *msg)
 
 
 /*
- * directory of filesystems mounted by this host
- *
- *   key: fsid.major ^ fsid.minor
- * value: struct ceph_client.fsid_item
- */
-
-/* ignore all this until later
-RADIX_TREE(ceph_clients, GFP_KERNEL);
-
 static struct ceph_client *get_client_fsid(struct ceph_fsid *fsid)
 {
 
@@ -190,6 +217,7 @@ void ceph_put_client(struct ceph_client *cl)
 
                /* unmount */
                /* ... */
+               put_client_counter();
                kfree(cl);
        }
 }
index 2c6692617e068be6ead1aac9c87849da0d199d7d..4dd6c3235ae0f402e773cddd6d1f2390f4a7a5db 100644 (file)
@@ -59,7 +59,7 @@ static int set_sock_callbacks(struct socket *sock, void *user_data)
 {
         struct sock *sk = sock->sk;
         sk->sk_user_data = user_data;
-        printk(KERN_INFO "Entered set_sock_callbacks\n");
+        dout(20, "set_sock_callbacks\n");
 
         /* Install callbacks */
         sk->sk_data_ready = ceph_data_ready;
@@ -80,7 +80,7 @@ int ceph_tcp_connect(struct ceph_connection *con)
 
         ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
         if (ret < 0) {
-                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+                derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret);
                 goto done;
         }
 
@@ -92,7 +92,7 @@ int ceph_tcp_connect(struct ceph_connection *con)
         if (ret == -EINPROGRESS) return 0;
         if (ret < 0) {
                 /* TBD check for fatal errors, retry if not fatal.. */
-                printk(KERN_INFO "kernel_connect error: %d\n", ret);
+                derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret);
                 sock_release(con->sock);
                 con->sock = NULL;
         }
index 7a41ca07e9e27834b7bd95e91a43b16dcbf1853e..00d789e19f37837a59809cebbb95b9b6287b53ec 100644 (file)
@@ -93,14 +93,22 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr)
  */
 static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
 {
-       struct ceph_connection *con;
+       struct ceph_connection *con = NULL;
        struct list_head *head, *p;
        unsigned long key = hash_addr(addr);
 
        /* existing? */
        spin_lock(&msgr->con_lock);
        head = radix_tree_lookup(&msgr->con_open, key);
-       if (head) {
+       if (head == NULL) 
+               goto out;
+       if (list_empty(head)) {
+               con = list_entry(head, struct ceph_connection, list_bucket);
+               if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
+                       atomic_inc(&con->nref);
+                       goto out;
+               }
+       } else {
                list_for_each(p, head) {
                        con = list_entry(p, struct ceph_connection, list_bucket);
                        if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
@@ -121,6 +129,7 @@ out:
 static void put_connection(struct ceph_connection *con) 
 {
        if (atomic_dec_and_test(&con->nref)) {
+               dout(20, "put_connection destroying %p\n", con);
                sock_release(con->sock);
                kfree(con);
        }
@@ -140,8 +149,10 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *
        spin_lock(&msgr->con_lock);
        head = radix_tree_lookup(&msgr->con_open, key);
        if (head) {
+               dout(20, "add_connection %p in existing bucket %lu\n", con, key);
                list_add(&con->list_bucket, head);
        } else {
+               dout(20, "add_connection %p in new bucket %lu\n", con, key);
                INIT_LIST_HEAD(&con->list_bucket); /* empty */
                radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
        }
@@ -172,8 +183,10 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio
                /* remove from con_open too */
                if (list_empty(&con->list_bucket)) {
                        /* last one */
+                       dout(20, "remove_connection %p and removing bucket %lu\n", con, key);
                        radix_tree_delete(&msgr->con_open, key);
                } else {
+                       dout(20, "remove_connection %p from bucket %lu\n", con, key);
                        list_del(&con->list_bucket);
                }
        }
@@ -742,13 +755,13 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                con = new_connection(msgr);
                if (IS_ERR(con))
                        return PTR_ERR(con);
-               dout(5, "opening new connection to peer %x:%d\n",
+               dout(5, "ceph_msg_send opening new connection %p to peer %x:%d\n", con,
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), 
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
                con->peer_addr = msg->hdr.dst.addr;
                add_connection(msgr, con);
        } else {
-               dout(5, "had connection to peer %x:%d\n",
+               dout(5, "ceph_msg_send had connection %p to peer %x:%d\n", con,
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
        }                    
@@ -757,21 +770,21 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
 
        /* initiate connect? */
        if (test_bit(NEW, &con->state)) {
+               dout(5, "ceph_msg_send initiating connect on %p\n", con);
                ret = ceph_tcp_connect(con);
                if (ret < 0){
                        derr(1, "connection failure to peer %x:%d\n",
                             ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
                             ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
                        remove_connection(msgr, con);
-                       kfree(con);
+                       put_connection(con);
                        return(ret);
-
                }
        }
        
        /* queue */
-       dout(1, "queuing outgoing message for %s%d\n",
-            ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num);
+       dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n",
+            ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
        ceph_msg_get(msg);
 
        list_add(&msg->list_head, &con->out_queue);