From 0b1bd64209c14aa4eaa1007ebd4f4dbefc8be616 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 24 Apr 2008 10:28:42 -0700 Subject: [PATCH] kclient: some msgr cleanups --- src/kernel/messenger.c | 45 +++++++++++++++++++++--------------------- src/kernel/messenger.h | 7 ++++--- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 995a893ca0c4c..ce7e940fb5f9e 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -86,7 +86,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, unsigned long key = hash_addr(addr); /* existing? */ - head = radix_tree_lookup(&msgr->con_open, key); + head = radix_tree_lookup(&msgr->con_tree, key); if (head == NULL) goto out; con = list_entry(head, struct ceph_connection, list_bucket); @@ -126,8 +126,8 @@ static void put_connection(struct ceph_connection *con) /* * add to connections tree */ -static void __add_connection(struct ceph_messenger *msgr, - struct ceph_connection *con) +static void __register_connection(struct ceph_messenger *msgr, + struct ceph_connection *con) { struct list_head *head; unsigned long key = hash_addr(&con->peer_addr); @@ -138,11 +138,10 @@ static void __add_connection(struct ceph_messenger *msgr, if (test_and_clear_bit(ACCEPTING, &con->state)) { list_del_init(&con->list_bucket); put_connection(con); - } else { + } else list_add(&con->list_all, &msgr->con_all); - } - head = radix_tree_lookup(&msgr->con_open, key); + head = radix_tree_lookup(&msgr->con_tree, key); if (head) { dout(20, "add_connection %p in existing bucket %lu head %p\n", con, key, head); @@ -151,8 +150,9 @@ static void __add_connection(struct ceph_messenger *msgr, dout(20, "add_connection %p in new bucket %lu head %p\n", con, key, &con->list_bucket); INIT_LIST_HEAD(&con->list_bucket); /* empty */ - radix_tree_insert(&msgr->con_open, key, &con->list_bucket); + radix_tree_insert(&msgr->con_tree, key, &con->list_bucket); } + set_bit(REGISTERED, &con->state); } static void add_connection_accepting(struct ceph_messenger *msgr, @@ -161,13 +161,12 @@ static void add_connection_accepting(struct ceph_messenger *msgr, atomic_inc(&con->nref); spin_lock(&msgr->con_lock); list_add(&con->list_all, &msgr->con_all); - list_add(&con->list_bucket, &msgr->con_accepting); spin_unlock(&msgr->con_lock); } /* * remove connection from all list. - * also, from con_open radix tree, if it should have been there + * also, from con_tree radix tree, if it should have been there */ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con) @@ -181,17 +180,16 @@ static void __remove_connection(struct ceph_messenger *msgr, return; } list_del_init(&con->list_all); - if (test_bit(CONNECTING, &con->state) || - test_bit(OPEN, &con->state)) { - /* remove from con_open too */ + if (test_bit(REGISTERED, &con->state)) { + /* remove from con_tree too */ key = hash_addr(&con->peer_addr); if (list_empty(&con->list_bucket)) { /* last one */ dout(20, "__remove_connection %p and bucket %lu\n", con, key); - radix_tree_delete(&msgr->con_open, key); + radix_tree_delete(&msgr->con_tree, key); } else { - slot = radix_tree_lookup_slot(&msgr->con_open, key); + slot = radix_tree_lookup_slot(&msgr->con_tree, key); val = radix_tree_deref_slot(slot); dout(20, "__remove_connection %p from bucket %lu " "head %p\n", con, key, val); @@ -205,6 +203,8 @@ static void __remove_connection(struct ceph_messenger *msgr, list_del_init(&con->list_bucket); } } + if (test_bit(ACCEPTING, &con->state)) + list_del_init(&con->list_bucket); put_connection(con); } @@ -216,6 +216,7 @@ static void remove_connection(struct ceph_messenger *msgr, spin_unlock(&msgr->con_lock); } + /* * atomically queue read or write work on a connection. * bump reference to avoid races. @@ -259,7 +260,7 @@ void ceph_queue_read(struct ceph_connection *con) * failure case * A retry mechanism is used with exponential backoff */ -static void ceph_send_fault(struct ceph_connection *con) +static void ceph_fault(struct ceph_connection *con) { derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name), IPQUADPORT(con->peer_addr.ipaddr), con->error_msg); @@ -532,7 +533,7 @@ static void try_write(struct work_struct *work) } if (test_and_clear_bit(SOCK_CLOSE, &con->state)) { - ceph_send_fault(con); + ceph_fault(con); goto done; } more: @@ -548,8 +549,8 @@ more: con, con->state); ret = ceph_tcp_connect(con); if (ret < 0) { - derr(1, "try_write tcp connect error %d\n", ret); - remove_connection(msgr, con); + con->error_msg = "connect error"; + ceph_fault(con); goto done; } } @@ -884,7 +885,7 @@ static void process_connect(struct ceph_connection *con) derr(1, "process_connect protocol error, will retry\n"); con->delay = BASE_DELAY_INTERVAL; con->error_msg = "protocol error"; - ceph_send_fault(con); + ceph_fault(con); } if (test_bit(WRITE_PENDING, &con->state)) ceph_queue_write(con); @@ -1019,7 +1020,7 @@ static void process_accept(struct ceph_connection *con) prepare_write_accept_reply(con, &tag_reset); } else { dout(20, "process_accept no existing connection, opening\n"); - __add_connection(msgr, con); + __register_connection(msgr, con); set_bit(OPEN, &con->state); con->connect_seq = peer_cseq + 1; prepare_write_accept_reply(con, &tag_ready); @@ -1212,7 +1213,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) spin_lock_init(&msgr->con_lock); INIT_LIST_HEAD(&msgr->con_all); INIT_LIST_HEAD(&msgr->con_accepting); - INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC); + INIT_RADIX_TREE(&msgr->con_tree, GFP_KERNEL); /* pick listening address */ if (myaddr) { @@ -1348,7 +1349,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, con = newcon; con->peer_addr = msg->hdr.dst.addr; con->peer_name = msg->hdr.dst.name; - __add_connection(msgr, con); + __register_connection(msgr, con); dout(5, "ceph_msg_send new connection %p to peer " "%u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index dc7acfe0de601..e602a79e19085 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -42,8 +42,8 @@ struct ceph_messenger { struct work_struct awork; /* accept work */ spinlock_t con_lock; struct list_head con_all; /* all connections */ - struct list_head con_accepting; /* doing handshake, or */ - struct radix_tree_root con_open; /* established */ + struct list_head con_accepting; /* accepting */ + struct radix_tree_root con_tree; /* established */ }; struct ceph_msg { @@ -76,6 +76,7 @@ struct ceph_msg_pos { #define CLOSED 8 /* we've closed the connection */ #define SOCK_CLOSE 9 /* socket state changed to close */ #define STANDBY 10 /* standby, when socket state close, no messages */ +#define REGISTERED 11 struct ceph_connection { struct ceph_messenger *msgr; @@ -86,7 +87,7 @@ struct ceph_connection { atomic_t nref; struct list_head list_all; /* msgr->con_all */ - struct list_head list_bucket; /* msgr->con_open or con_accepting */ + struct list_head list_bucket; /* msgr->con_tree or con_accepting */ struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ -- 2.39.5