From e55d8503e3954c944efca94094cc691d8eb539c5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 16 Jan 2008 11:04:32 -0800 Subject: [PATCH] rework get/add/replace connection msgr spinlocks --- src/kernel/messenger.c | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 8a2af20ead6c7..d7ef53d38ee46 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -153,14 +153,13 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr) /* * get an existing connection, if any, for given addr */ -static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) +static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) { struct ceph_connection *con = NULL; struct list_head *head, *p; unsigned long key = hash_addr(addr); /* existing? */ - spin_lock(&msgr->con_lock); head = radix_tree_lookup(&msgr->con_open, key); if (head == NULL) goto out; @@ -181,10 +180,11 @@ static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struc } con = NULL; out: - spin_unlock(&msgr->con_lock); return con; } + + /* * drop a reference */ @@ -203,7 +203,7 @@ static void put_connection(struct ceph_connection *con) /* * add to connections tree */ -static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *con) +static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con) { struct list_head *head; unsigned long key = hash_addr(&con->peer_addr); @@ -211,7 +211,6 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection * /* inc ref count */ atomic_inc(&con->nref); - spin_lock(&msgr->con_lock); /* PW this is bogus... needs to be readdressed later */ if (test_bit(ACCEPTING, &con->state)) { list_del(&con->list_bucket); @@ -229,7 +228,6 @@ static void add_connection(struct ceph_messenger *msgr, struct ceph_connection * INIT_LIST_HEAD(&con->list_bucket); /* empty */ radix_tree_insert(&msgr->con_open, key, &con->list_bucket); } - spin_unlock(&msgr->con_lock); } static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con) @@ -279,7 +277,7 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio * replace another connection * (old and new should be for the _same_ peer, and thus in the same pos in the radix tree) */ -static void replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new) +static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new) { spin_lock(&msgr->con_lock); list_add(&new->list_bucket, &old->list_bucket); @@ -479,7 +477,7 @@ more: set_bit(CONNECTING, &con->state); dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state); ret = ceph_tcp_connect(con); - dout(30, "try_write returned from connect ret = %d state = %lu", ret, con->state); + dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state); if (ret < 0) { /* fault */ derr(1, "try_write connect error\n"); @@ -782,19 +780,20 @@ static int read_accept_partial(struct ceph_connection *con) static void process_accept(struct ceph_connection *con) { struct ceph_connection *existing; + struct ceph_messenger *msgr = con->msgr; /* do we already have a connection for this peer? */ - spin_lock(&con->msgr->con_lock); - existing = get_connection(con->msgr, &con->peer_addr); + spin_lock(&msgr->con_lock); + existing = __get_connection(msgr, &con->peer_addr); if (existing) { //spin_lock(&existing->lock); /* replace existing connection? */ if ((test_bit(CONNECTING, &existing->state) && - ceph_entity_addr_equal(&con->msgr->inst.addr, &con->peer_addr)) || + ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) || (test_bit(OPEN, &existing->state) && con->connect_seq == existing->connect_seq)) { /* replace existing with new connection */ - replace_connection(con->msgr, existing, con); + __replace_connection(msgr, existing, con); /* steal message queue */ list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */ con->out_seq = existing->out_seq; @@ -809,10 +808,10 @@ static void process_accept(struct ceph_connection *con) //spin_unlock(&existing->lock); put_connection(existing); } else { - add_connection(con->msgr, con); + __add_connection(msgr, con); set_bit(OPEN, &con->state); } - spin_unlock(&con->msgr->con_lock); + spin_unlock(&msgr->con_lock); /* the result? */ clear_bit(ACCEPTING, &con->state); @@ -1035,7 +1034,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, msg->hdr.src = msgr->inst; /* do we have the connection? */ - con = get_connection(msgr, &msg->hdr.dst.addr); + spin_lock(&msgr->con_lock); + con = __get_connection(msgr, &msg->hdr.dst.addr); if (!con) { con = new_connection(msgr); if (IS_ERR(con)) @@ -1044,12 +1044,13 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); con->peer_addr = msg->hdr.dst.addr; - add_connection(msgr, con); + __add_connection(msgr, con); } else { dout(10, "ceph_msg_send had connection %p to peer %x:%d\n", con, ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); - } + } + spin_unlock(&msgr->con_lock); con->delay = timeout; dout(10, "ceph_msg_send delay = %lu\n", con->delay); /* queue */ -- 2.39.5