From: Sage Weil Date: Wed, 26 Aug 2009 18:02:13 +0000 (-0700) Subject: kclient: add explicit connection-based api X-Git-Tag: v0.14~117 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f2494d1a2d4a05ffc669648d506380d5c99eaf6d;p=ceph.git kclient: add explicit connection-based api --- diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index e95ebd664b91..c855baa631b3 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -216,23 +216,6 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, } -/* - * initialize a new connection - */ -void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con, - struct ceph_entity_addr *addr) -{ - con->msgr = msgr; - atomic_set(&con->nref, 1); - INIT_LIST_HEAD(&con->list_all); - INIT_LIST_HEAD(&con->list_bucket); - spin_lock_init(&con->out_queue_lock); - INIT_LIST_HEAD(&con->out_queue); - INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); - con->peer_addr = *addr; -} - /* * The con_tree radix_tree has an unsigned long key and void * value. * Since ceph_entity_addr is bigger than that, we use a trivial hash @@ -274,7 +257,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, return NULL; yes: - atomic_inc(&con->nref); + con->get(con); dout("get_connection %p nref = %d -> %d\n", con, atomic_read(&con->nref) - 1, atomic_read(&con->nref)); return con; @@ -298,23 +281,61 @@ static int con_close_socket(struct ceph_connection *con) } /* - * drop a reference + * clean up connection state + */ +static void ceph_con_destroy(struct ceph_connection *con) +{ + dout("con_destroy %p destroying\n", con); + ceph_msg_put_list(&con->out_queue); + ceph_msg_put_list(&con->out_sent); + set_bit(CLOSED, &con->state); + con_close_socket(con); /* silently ignore errors */ +} + +/* + * generic get/put */ +static void get_connection(struct ceph_connection *con) +{ + dout("get_connection %p nref = %d -> %d\n", con, + atomic_read(&con->nref), atomic_read(&con->nref) + 1); + atomic_inc(&con->nref); +} + static void put_connection(struct ceph_connection *con) { dout("put_connection %p nref = %d -> %d\n", con, atomic_read(&con->nref), atomic_read(&con->nref) - 1); BUG_ON(atomic_read(&con->nref) == 0); if (atomic_dec_and_test(&con->nref)) { - dout("put_connection %p destroying\n", con); - ceph_msg_put_list(&con->out_queue); - ceph_msg_put_list(&con->out_sent); - set_bit(CLOSED, &con->state); - con_close_socket(con); /* silently ignore possible errors */ + ceph_con_destroy(con); kfree(con); } } +/* + * initialize a new connection. + * + * NOTE: assumes struct is initially zeroed! + */ +void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con, + struct ceph_entity_addr *addr) +{ + atomic_set(&con->nref, 1); + con->msgr = msgr; + INIT_LIST_HEAD(&con->list_all); + INIT_LIST_HEAD(&con->list_bucket); + spin_lock_init(&con->out_queue_lock); + INIT_LIST_HEAD(&con->out_queue); + INIT_LIST_HEAD(&con->out_sent); + INIT_DELAYED_WORK(&con->work, con_work); + con->peer_addr = *addr; + + con->private = NULL; + con->get = get_connection; + con->put = put_connection; +} + /* * add a connection to the con_tree. * @@ -329,7 +350,7 @@ static int __register_connection(struct ceph_messenger *msgr, dout("register_connection %p %d -> %d\n", con, atomic_read(&con->nref), atomic_read(&con->nref) + 1); - atomic_inc(&con->nref); + con->get(con); list_add(&con->list_all, &msgr->con_all); @@ -345,7 +366,7 @@ static int __register_connection(struct ceph_messenger *msgr, rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket); if (rc < 0) { list_del(&con->list_all); - put_connection(con); + con->put(con); return rc; } } @@ -395,7 +416,7 @@ static void __remove_connection(struct ceph_messenger *msgr, list_del_init(&con->list_bucket); } } - put_connection(con); + con->put(con); } static void remove_connection(struct ceph_messenger *msgr, @@ -1515,15 +1536,15 @@ static void ceph_queue_con(struct ceph_connection *con) return; } - atomic_inc(&con->nref); - dout("ceph_queue_con %p %d -> %d\n", con, - atomic_read(&con->nref) - 1, atomic_read(&con->nref)); + con->get(con); set_bit(QUEUED, &con->state); if (test_bit(BUSY, &con->state) || !queue_work(ceph_msgr_wq, &con->work.work)) { dout("ceph_queue_con %p - already BUSY or queued\n", con); - put_connection(con); + con->put(con); + } else { + dout("ceph_queue_con %p\n", con); } } @@ -1574,7 +1595,7 @@ done: dout("con_work %p done\n", con); out: - put_connection(con); + con->put(con); } @@ -1624,7 +1645,7 @@ static void ceph_fault(struct ceph_connection *con) dout("fault queueing %p %d -> %d delay %lu\n", con, atomic_read(&con->nref), atomic_read(&con->nref) + 1, con->delay); - atomic_inc(&con->nref); + con->get(con); queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay)); } @@ -1660,7 +1681,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) if (myaddr) { msgr->inst.addr = *myaddr; } else { - dout("create my ip not specified, binding to INADDR_ANY\n"); + dout("create ip not specified, initially INADDR_ANY\n"); msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY); msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */ } @@ -1686,17 +1707,15 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) list_all); dout("destroy removing connection %p\n", con); set_bit(CLOSED, &con->state); - atomic_inc(&con->nref); - dout(" get %p %d -> %d\n", con, - atomic_read(&con->nref) - 1, atomic_read(&con->nref)); + con->get(con); __remove_connection(msgr, con); /* in case there's queued work. drop a reference if * we successfully cancel work. */ spin_unlock(&msgr->con_lock); if (cancel_delayed_work_sync(&con->work)) - put_connection(con); - put_connection(con); + con->put(con); + con->put(con); dout("destroy removed connection %p\n", con); spin_lock(&msgr->con_lock); @@ -1713,6 +1732,13 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) /* * mark a peer down. drop any open connections. */ +void ceph_con_close(struct ceph_connection *con) +{ + dout("close %p peer %u.%u.%u.%u:%u\n", con, + IPQUADPORT(con->peer_addr.ipaddr)); + set_bit(CLOSED, &con->state); /* in case there's queued work */ +} + void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) { @@ -1732,10 +1758,9 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, } spin_unlock(&msgr->con_lock); if (con) - put_connection(con); + con->put(con); } - /* * A single ceph_msg can't be queued for send twice, unless it's * already been delivered (i.e. we have the only remaining reference), @@ -1816,7 +1841,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, spin_lock(&msgr->con_lock); con = __get_connection(msgr, &msg->hdr.dst.addr); if (con) { - put_connection(newcon); + con->put(newcon); dout("ceph_msg_send (lost race and) had connection " "%p to peer %u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); @@ -1870,7 +1895,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) ceph_queue_con(con); - put_connection(con); + con->put(con); dout("ceph_msg_send done\n"); return ret; } @@ -1883,6 +1908,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* set source */ msg->hdr.src = con->msgr->inst; msg->hdr.orig_src = con->msgr->inst; + msg->hdr.dst.addr = con->peer_addr; + msg->hdr.dst.name = con->peer_name; /* queue */ spin_lock(&con->out_queue_lock); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 366f415fae01..fb56763d9009 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -12,6 +12,7 @@ #include "buffer.h" struct ceph_msg; +struct ceph_connection; #define IPQUADPORT(n) \ (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF, \ @@ -41,6 +42,8 @@ typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p, struct ceph_msg_header *hdr); typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg); +typedef void (*ceph_con_get_t)(struct ceph_connection *); +typedef void (*ceph_con_put_t)(struct ceph_connection *); static inline const char *ceph_name_type_str(int t) { @@ -147,14 +150,15 @@ struct ceph_msg_pos { */ struct ceph_connection { void *private; + ceph_con_get_t get; + ceph_con_put_t put; + atomic_t nref; struct ceph_messenger *msgr; struct socket *sock; unsigned long state; /* connection state (see flags above) */ const char *error_msg; /* error message, if any */ - atomic_t nref; - struct list_head list_all; /* msgr->con_all */ struct list_head list_bucket; @@ -224,7 +228,8 @@ extern void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con, struct ceph_entity_addr *addr); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); - +extern void ceph_con_keepalive(struct ceph_connection *con); +extern void ceph_con_close(struct ceph_connection *con); extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr);