/*
* get an existing connection, if any, for given addr
*/
-static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
{
struct ceph_connection *con = NULL;
struct list_head *head, *p;
unsigned long key = hash_addr(addr);
/* existing? */
- spin_lock(&msgr->con_lock);
head = radix_tree_lookup(&msgr->con_open, key);
if (head == NULL)
goto out;
}
con = NULL;
out:
- spin_unlock(&msgr->con_lock);
return con;
}
+
+
/*
* drop a reference
*/
/*
* add to connections tree
*/
-static void add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
{
struct list_head *head;
unsigned long key = hash_addr(&con->peer_addr);
/* inc ref count */
atomic_inc(&con->nref);
- spin_lock(&msgr->con_lock);
/* PW this is bogus... needs to be readdressed later */
if (test_bit(ACCEPTING, &con->state)) {
list_del(&con->list_bucket);
INIT_LIST_HEAD(&con->list_bucket); /* empty */
radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
}
- spin_unlock(&msgr->con_lock);
}
static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con)
* replace another connection
* (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
*/
-static void replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
{
spin_lock(&msgr->con_lock);
list_add(&new->list_bucket, &old->list_bucket);
set_bit(CONNECTING, &con->state);
dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
ret = ceph_tcp_connect(con);
- dout(30, "try_write returned from connect ret = %d state = %lu", ret, con->state);
+ dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
if (ret < 0) {
/* fault */
derr(1, "try_write connect error\n");
static void process_accept(struct ceph_connection *con)
{
struct ceph_connection *existing;
+ struct ceph_messenger *msgr = con->msgr;
/* do we already have a connection for this peer? */
- spin_lock(&con->msgr->con_lock);
- existing = get_connection(con->msgr, &con->peer_addr);
+ spin_lock(&msgr->con_lock);
+ existing = __get_connection(msgr, &con->peer_addr);
if (existing) {
//spin_lock(&existing->lock);
/* replace existing connection? */
if ((test_bit(CONNECTING, &existing->state) &&
- ceph_entity_addr_equal(&con->msgr->inst.addr, &con->peer_addr)) ||
+ ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) ||
(test_bit(OPEN, &existing->state) &&
con->connect_seq == existing->connect_seq)) {
/* replace existing with new connection */
- replace_connection(con->msgr, existing, con);
+ __replace_connection(msgr, existing, con);
/* steal message queue */
list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
con->out_seq = existing->out_seq;
//spin_unlock(&existing->lock);
put_connection(existing);
} else {
- add_connection(con->msgr, con);
+ __add_connection(msgr, con);
set_bit(OPEN, &con->state);
}
- spin_unlock(&con->msgr->con_lock);
+ spin_unlock(&msgr->con_lock);
/* the result? */
clear_bit(ACCEPTING, &con->state);
msg->hdr.src = msgr->inst;
/* do we have the connection? */
- con = get_connection(msgr, &msg->hdr.dst.addr);
+ spin_lock(&msgr->con_lock);
+ con = __get_connection(msgr, &msg->hdr.dst.addr);
if (!con) {
con = new_connection(msgr);
if (IS_ERR(con))
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
con->peer_addr = msg->hdr.dst.addr;
- add_connection(msgr, con);
+ __add_connection(msgr, con);
} else {
dout(10, "ceph_msg_send had connection %p to peer %x:%d\n", con,
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
- }
+ }
+ spin_unlock(&msgr->con_lock);
con->delay = timeout;
dout(10, "ceph_msg_send delay = %lu\n", con->delay);
/* queue */