]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: add explicit connection-based api
authorSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 18:02:13 +0000 (11:02 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 20:09:09 +0000 (13:09 -0700)
src/kernel/messenger.c
src/kernel/messenger.h

index e95ebd664b9111128126b175a0ba25377b61eb55..c855baa631b348c6c2c48e06b972354703575443 100644 (file)
@@ -216,23 +216,6 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
 }
 
 
-/*
- * initialize a new connection
- */
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
-                  struct ceph_entity_addr *addr)
-{
-       con->msgr = msgr;
-       atomic_set(&con->nref, 1);
-       INIT_LIST_HEAD(&con->list_all);
-       INIT_LIST_HEAD(&con->list_bucket);
-       spin_lock_init(&con->out_queue_lock);
-       INIT_LIST_HEAD(&con->out_queue);
-       INIT_LIST_HEAD(&con->out_sent);
-       INIT_DELAYED_WORK(&con->work, con_work);
-       con->peer_addr = *addr;
-}
-
 /*
  * The con_tree radix_tree has an unsigned long key and void * value.
  * Since ceph_entity_addr is bigger than that, we use a trivial hash
@@ -274,7 +257,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
        return NULL;
 
 yes:
-       atomic_inc(&con->nref);
+       con->get(con);
        dout("get_connection %p nref = %d -> %d\n", con,
             atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        return con;
@@ -298,23 +281,61 @@ static int con_close_socket(struct ceph_connection *con)
 }
 
 /*
- * drop a reference
+ * clean up connection state
+ */
+static void ceph_con_destroy(struct ceph_connection *con)
+{
+       dout("con_destroy %p destroying\n", con);
+       ceph_msg_put_list(&con->out_queue);
+       ceph_msg_put_list(&con->out_sent);
+       set_bit(CLOSED, &con->state);
+       con_close_socket(con); /* silently ignore errors */
+}
+
+/*
+ * generic get/put
  */
+static void get_connection(struct ceph_connection *con)
+{
+       dout("get_connection %p nref = %d -> %d\n", con,
+            atomic_read(&con->nref), atomic_read(&con->nref) + 1);
+       atomic_inc(&con->nref);
+}
+
 static void put_connection(struct ceph_connection *con)
 {
        dout("put_connection %p nref = %d -> %d\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) - 1);
        BUG_ON(atomic_read(&con->nref) == 0);
        if (atomic_dec_and_test(&con->nref)) {
-               dout("put_connection %p destroying\n", con);
-               ceph_msg_put_list(&con->out_queue);
-               ceph_msg_put_list(&con->out_sent);
-               set_bit(CLOSED, &con->state);
-               con_close_socket(con); /* silently ignore possible errors */
+               ceph_con_destroy(con);
                kfree(con);
        }
 }
 
+/*
+ * initialize a new connection.
+ *
+ * NOTE: assumes struct is initially zeroed!
+ */
+void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
+                  struct ceph_entity_addr *addr)
+{
+       atomic_set(&con->nref, 1);
+       con->msgr = msgr;
+       INIT_LIST_HEAD(&con->list_all);
+       INIT_LIST_HEAD(&con->list_bucket);
+       spin_lock_init(&con->out_queue_lock);
+       INIT_LIST_HEAD(&con->out_queue);
+       INIT_LIST_HEAD(&con->out_sent);
+       INIT_DELAYED_WORK(&con->work, con_work);
+       con->peer_addr = *addr;
+
+       con->private = NULL;
+       con->get = get_connection;
+       con->put = put_connection;
+}
+
 /*
  * add a connection to the con_tree.
  *
@@ -329,7 +350,7 @@ static int __register_connection(struct ceph_messenger *msgr,
 
        dout("register_connection %p %d -> %d\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1);
-       atomic_inc(&con->nref);
+       con->get(con);
 
        list_add(&con->list_all, &msgr->con_all);
 
@@ -345,7 +366,7 @@ static int __register_connection(struct ceph_messenger *msgr,
                rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
                if (rc < 0) {
                        list_del(&con->list_all);
-                       put_connection(con);
+                       con->put(con);
                        return rc;
                }
        }
@@ -395,7 +416,7 @@ static void __remove_connection(struct ceph_messenger *msgr,
                        list_del_init(&con->list_bucket);
                }
        }
-       put_connection(con);
+       con->put(con);
 }
 
 static void remove_connection(struct ceph_messenger *msgr,
@@ -1515,15 +1536,15 @@ static void ceph_queue_con(struct ceph_connection *con)
                return;
        }
 
-       atomic_inc(&con->nref);
-       dout("ceph_queue_con %p %d -> %d\n", con,
-            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+       con->get(con);
 
        set_bit(QUEUED, &con->state);
        if (test_bit(BUSY, &con->state) ||
            !queue_work(ceph_msgr_wq, &con->work.work)) {
                dout("ceph_queue_con %p - already BUSY or queued\n", con);
-               put_connection(con);
+               con->put(con);
+       } else {
+               dout("ceph_queue_con %p\n", con);
        }
 }
 
@@ -1574,7 +1595,7 @@ done:
        dout("con_work %p done\n", con);
 
 out:
-       put_connection(con);
+       con->put(con);
 }
 
 
@@ -1624,7 +1645,7 @@ static void ceph_fault(struct ceph_connection *con)
        dout("fault queueing %p %d -> %d delay %lu\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1,
             con->delay);
-       atomic_inc(&con->nref);
+       con->get(con);
        queue_delayed_work(ceph_msgr_wq, &con->work,
                           round_jiffies_relative(con->delay));
 }
@@ -1660,7 +1681,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        if (myaddr) {
                msgr->inst.addr = *myaddr;
        } else {
-               dout("create my ip not specified, binding to INADDR_ANY\n");
+               dout("create ip not specified, initially INADDR_ANY\n");
                msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY);
                msgr->inst.addr.ipaddr.sin_port = htons(0);  /* any port */
        }
@@ -1686,17 +1707,15 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
                                 list_all);
                dout("destroy removing connection %p\n", con);
                set_bit(CLOSED, &con->state);
-               atomic_inc(&con->nref);
-               dout(" get %p %d -> %d\n", con,
-                    atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+               con->get(con);
                __remove_connection(msgr, con);
 
                /* in case there's queued work.  drop a reference if
                 * we successfully cancel work. */
                spin_unlock(&msgr->con_lock);
                if (cancel_delayed_work_sync(&con->work))
-                       put_connection(con);
-               put_connection(con);
+                       con->put(con);
+               con->put(con);
                dout("destroy removed connection %p\n", con);
 
                spin_lock(&msgr->con_lock);
@@ -1713,6 +1732,13 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
 /*
  * mark a peer down.  drop any open connections.
  */
+void ceph_con_close(struct ceph_connection *con)
+{
+       dout("close %p peer %u.%u.%u.%u:%u\n", con,
+            IPQUADPORT(con->peer_addr.ipaddr));
+       set_bit(CLOSED, &con->state);  /* in case there's queued work */
+}
+
 void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                              struct ceph_entity_addr *addr)
 {
@@ -1732,10 +1758,9 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr,
        }
        spin_unlock(&msgr->con_lock);
        if (con)
-               put_connection(con);
+               con->put(con);
 }
 
-
 /*
  * A single ceph_msg can't be queued for send twice, unless it's
  * already been delivered (i.e. we have the only remaining reference),
@@ -1816,7 +1841,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                spin_lock(&msgr->con_lock);
                con = __get_connection(msgr, &msg->hdr.dst.addr);
                if (con) {
-                       put_connection(newcon);
+                       con->put(newcon);
                        dout("ceph_msg_send (lost race and) had connection "
                             "%p to peer %u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));
@@ -1870,7 +1895,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                ceph_queue_con(con);
 
-       put_connection(con);
+       con->put(con);
        dout("ceph_msg_send done\n");
        return ret;
 }
@@ -1883,6 +1908,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        /* set source */
        msg->hdr.src = con->msgr->inst;
        msg->hdr.orig_src = con->msgr->inst;
+       msg->hdr.dst.addr = con->peer_addr;
+       msg->hdr.dst.name = con->peer_name;
 
        /* queue */
        spin_lock(&con->out_queue_lock);
index 366f415fae01c62546d0ad23279c5cff338663cb..fb56763d9009a83ff0399d970d295fbc68d9a7dd 100644 (file)
@@ -12,6 +12,7 @@
 #include "buffer.h"
 
 struct ceph_msg;
+struct ceph_connection;
 
 #define IPQUADPORT(n)                                                  \
        (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF, \
@@ -41,6 +42,8 @@ typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
                                            struct ceph_msg_header *hdr);
 typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg);
 
+typedef void (*ceph_con_get_t)(struct ceph_connection *);
+typedef void (*ceph_con_put_t)(struct ceph_connection *);
 
 static inline const char *ceph_name_type_str(int t)
 {
@@ -147,14 +150,15 @@ struct ceph_msg_pos {
  */
 struct ceph_connection {
        void *private;
+       ceph_con_get_t get;
+       ceph_con_put_t put;
+       atomic_t nref;
 
        struct ceph_messenger *msgr;
        struct socket *sock;
        unsigned long state;    /* connection state (see flags above) */
        const char *error_msg;  /* error message, if any */
 
-       atomic_t nref;
-
        struct list_head list_all;     /* msgr->con_all */
        struct list_head list_bucket;
 
@@ -224,7 +228,8 @@ extern void ceph_con_init(struct ceph_messenger *msgr,
                          struct ceph_connection *con,
                          struct ceph_entity_addr *addr);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
-
+extern void ceph_con_keepalive(struct ceph_connection *con);
+extern void ceph_con_close(struct ceph_connection *con);
 
 extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                                     struct ceph_entity_addr *addr);