From e11a7e49b2ccb32592e5c47b993c25505647d5e8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 25 Aug 2009 16:23:35 -0700 Subject: [PATCH] kclient: do not accept connections in messager (client only!) This rips out a lot of code, yay! --- src/kernel/messenger.c | 450 +---------------------------------------- src/kernel/messenger.h | 7 +- 2 files changed, 3 insertions(+), 454 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index e5400dcc4e0b2..8d618f23ba941 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -68,17 +68,6 @@ void ceph_msgr_exit(void) * socket callback functions */ -/* listen socket received a connection */ -static void ceph_accept_ready(struct sock *sk, int count_unused) -{ - struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data; - - dout("ceph_accept_ready messenger %p sk_state = %u\n", - msgr, sk->sk_state); - if (sk->sk_state == TCP_LISTEN) - queue_work(ceph_msgr_wq, &msgr->awork); -} - /* data available on socket, or listen socket received a connect */ static void ceph_data_ready(struct sock *sk, int count_unused) { @@ -143,14 +132,6 @@ static void ceph_state_change(struct sock *sk) /* * set up socket callbacks */ -static void listen_sock_callbacks(struct socket *sock, - struct ceph_messenger *msgr) -{ - struct sock *sk = sock->sk; - sk->sk_user_data = (void *)msgr; - sk->sk_data_ready = ceph_accept_ready; -} - static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con) { @@ -208,89 +189,6 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) return sock; } -/* - * set up listening socket - */ -static int ceph_tcp_listen(struct ceph_messenger *msgr) -{ - int ret; - int optval = 1; - struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr; - int nlen; - struct socket *sock; - - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); - if (ret) - return ret; - sock->sk->sk_allocation = GFP_NOFS; - ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&optval, sizeof(optval)); - if (ret < 0) { - pr_err("ceph failed to set SO_REUSEADDR: %d\n", ret); - goto err; - } - - ret = sock->ops->bind(sock, (struct sockaddr *)myaddr, - sizeof(*myaddr)); - if (ret < 0) { - pr_err("ceph failed to bind: %d\n", ret); - goto err; - } - - /* what port did we bind to? */ - nlen = sizeof(*myaddr); - ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen, - 0); - if (ret < 0) { - pr_err("ceph failed to getsockname: %d\n", ret); - goto err; - } - pr_info("ceph listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr)); - - /* we don't care too much if this works or not */ - sock->ops->listen(sock, CEPH_MSGR_BACKUP); - - /* ok! */ - msgr->listen_sock = sock; - listen_sock_callbacks(sock, msgr); - return 0; - -err: - sock_release(sock); - return ret; -} - -/* - * accept a connection - */ -static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con) -{ - struct socket *sock; - int ret; - - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); - if (ret) - return ret; - con->sock = sock; - sock->sk->sk_allocation = GFP_NOFS; - - ret = lsock->ops->accept(lsock, sock, O_NONBLOCK); - if (ret < 0) { - pr_err("ceph accept error: %d\n", ret); - goto err; - } - - sock->ops = lsock->ops; - sock->type = lsock->type; - set_sock_callbacks(sock, con); - return ret; - -err: - sock->ops->shutdown(sock, SHUT_RDWR); - sock_release(sock); - return ret; -} - static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; @@ -438,14 +336,7 @@ static int __register_connection(struct ceph_messenger *msgr, atomic_read(&con->nref), atomic_read(&con->nref) + 1); atomic_inc(&con->nref); - /* if were just ACCEPTING this connection, it is already on the - * con_all and con_accepting lists. */ - if (test_and_clear_bit(ACCEPTING, &con->state)) { - list_del_init(&con->list_bucket); - put_connection(con); - } else { - list_add(&con->list_all, &msgr->con_all); - } + list_add(&con->list_all, &msgr->con_all); head = radix_tree_lookup(&msgr->con_tree, key); if (head) { @@ -467,20 +358,6 @@ static int __register_connection(struct ceph_messenger *msgr, return 0; } -/* - * called under con_lock. - */ -static void add_connection_accepting(struct ceph_messenger *msgr, - struct ceph_connection *con) -{ - dout("add_connection_accepting %p nref = %d -> %d\n", con, - atomic_read(&con->nref), atomic_read(&con->nref) + 1); - atomic_inc(&con->nref); - spin_lock(&msgr->con_lock); - list_add(&con->list_all, &msgr->con_all); - spin_unlock(&msgr->con_lock); -} - /* * Remove connection from all list. Also, from con_tree, if it should * have been there. @@ -523,8 +400,6 @@ static void __remove_connection(struct ceph_messenger *msgr, list_del_init(&con->list_bucket); } } - if (test_and_clear_bit(ACCEPTING, &con->state)) - list_del_init(&con->list_bucket); put_connection(con); } @@ -536,49 +411,6 @@ static void remove_connection(struct ceph_messenger *msgr, spin_unlock(&msgr->con_lock); } -/* - * replace another connection - * (old and new should be for the _same_ peer, - * and thus in the same bucket in the radix tree) - */ -static void __replace_connection(struct ceph_messenger *msgr, - struct ceph_connection *old, - struct ceph_connection *new) -{ - unsigned long key = hash_addr(&new->peer_addr); - void **slot; - - dout("replace_connection %p with %p\n", old, new); - - /* replace in con_tree */ - slot = radix_tree_lookup_slot(&msgr->con_tree, key); - if (*slot == &old->list_bucket) - radix_tree_replace_slot(slot, &new->list_bucket); - else - BUG_ON(list_empty(&old->list_bucket)); - if (!list_empty(&old->list_bucket)) { - /* replace old with new in bucket list */ - list_add(&new->list_bucket, &old->list_bucket); - list_del_init(&old->list_bucket); - } - - /* take old connections message queue */ - spin_lock(&old->out_queue_lock); - if (!list_empty(&old->out_queue)) - list_splice_init(&new->out_queue, &old->out_queue); - spin_unlock(&old->out_queue_lock); - - new->connect_seq = le32_to_cpu(new->in_connect.connect_seq); - new->out_seq = old->out_seq; - new->peer_name = old->peer_name; - - set_bit(CLOSED, &old->state); - put_connection(old); /* dec reference count */ - - clear_bit(ACCEPTING, &new->state); -} - - /* @@ -773,52 +605,6 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr, set_bit(WRITE_PENDING, &con->state); } -/* - * We accepted a connection and are saying hello. - */ -static void prepare_write_accept_hello(struct ceph_messenger *msgr, - struct ceph_connection *con) -{ - int len = strlen(CEPH_BANNER); - - dout("prepare_write_accept_hello %p\n", con); - con->out_kvec[0].iov_base = CEPH_BANNER; - con->out_kvec[0].iov_len = len; - con->out_kvec[1].iov_base = &msgr->inst.addr; - con->out_kvec[1].iov_len = sizeof(msgr->inst.addr); - con->out_kvec_left = 2; - con->out_kvec_bytes = len + sizeof(msgr->inst.addr); - con->out_kvec_cur = con->out_kvec; - con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); -} - -/* - * Reply to a connect attempt, indicating whether the negotiation has - * succeeded or must continue. - */ -static void prepare_write_accept_reply(struct ceph_connection *con, bool retry) -{ - dout("prepare_write_accept_reply %p\n", con); - con->out_reply.flags = 0; - if (test_bit(LOSSYTX, &con->state)) - con->out_reply.flags = CEPH_MSG_CONNECT_LOSSY; - - con->out_kvec[0].iov_base = &con->out_reply; - con->out_kvec[0].iov_len = sizeof(con->out_reply); - con->out_kvec_left = 1; - con->out_kvec_bytes = sizeof(con->out_reply); - con->out_kvec_cur = con->out_kvec; - con->out_more = 0; - set_bit(WRITE_PENDING, &con->state); - - if (retry) - /* we'll re-read the connect request, sans the hello + addr */ - con->in_base_pos = strlen(CEPH_BANNER) + - sizeof(con->msgr->inst.addr); -} - - /* * write as much of pending kvecs to the socket as we can. @@ -1182,176 +968,6 @@ static int process_connect(struct ceph_connection *con) } -/* - * Read all or part of the accept-side handshake on a newly accepted - * connection. - */ -static int read_partial_accept(struct ceph_connection *con) -{ - int ret; - int to = 0; - - /* banner */ - ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); - if (ret <= 0) - return ret; - ret = read_partial(con, &to, sizeof(con->peer_addr), &con->peer_addr); - if (ret <= 0) - return ret; - ret = read_partial(con, &to, sizeof(con->in_connect), &con->in_connect); - if (ret <= 0) - return ret; - return 1; -} - -/* - * Call after a new connection's handshake has been read. - */ -static int process_accept(struct ceph_connection *con) -{ - struct ceph_connection *existing; - struct ceph_messenger *msgr = con->msgr; - u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq); - u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq); - bool retry = true; - bool replace = false; - - dout("process_accept %p got gseq %d cseq %d\n", con, - peer_gseq, peer_cseq); - - if (verify_hello(con) < 0) - return -1; - - /* note flags */ - if (con->in_connect.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(LOSSYRX, &con->state); - - /* do we have an existing connection for this peer? */ - if (radix_tree_preload(GFP_NOFS) < 0) { - pr_err("ceph ENOMEM in process_accept\n"); - con->error_msg = "out of memory"; - return -1; - } - - memset(&con->out_reply, 0, sizeof(con->out_reply)); - - spin_lock(&msgr->con_lock); - existing = __get_connection(msgr, &con->peer_addr); - if (existing) { - if (peer_gseq < existing->peer_global_seq) { - /* out of order connection attempt */ - con->out_reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; - con->out_reply.global_seq = - cpu_to_le32(con->peer_global_seq); - goto reply; - } - if (test_bit(LOSSYTX, &existing->state)) { - dout("process_accept %p replacing LOSSYTX %p\n", - con, existing); - replace = true; - goto accept; - } - if (peer_cseq < existing->connect_seq) { - if (peer_cseq == 0) { - /* peer reset, then connected to us */ - reset_connection(existing); - pr_info("reset on %s%d\n", ENTITY_NAME(con->peer_name)); - con->msgr->peer_reset(con->msgr->parent, - &con->peer_addr, - &con->peer_name); - replace = true; - goto accept; - } - - /* old attempt or peer didn't get the READY */ - con->out_reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; - con->out_reply.connect_seq = - cpu_to_le32(existing->connect_seq); - goto reply; - } - - if (peer_cseq == existing->connect_seq) { - /* connection race */ - dout("process_accept connection race state = %lu\n", - con->state); - if (ceph_entity_addr_equal(&msgr->inst.addr, - &con->peer_addr)) { - /* incoming connection wins.. */ - replace = true; - goto accept; - } - - /* our existing outgoing connection wins, tell peer - to wait for our outging connection to go through */ - con->out_reply.tag = CEPH_MSGR_TAG_WAIT; - goto reply; - } - - if (existing->connect_seq == 0 && - peer_cseq > existing->connect_seq) { - /* we reset and already reconnecting */ - con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION; - goto reply; - } - - WARN_ON(le32_to_cpu(con->in_connect.connect_seq) <= - existing->connect_seq); - WARN_ON(le32_to_cpu(con->in_connect.global_seq) < - existing->peer_global_seq); - if (existing->connect_seq == 0) { - /* we reset, sending RESETSESSION */ - con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION; - goto reply; - } - - /* reconnect, replace connection */ - replace = true; - goto accept; - } - - if (peer_cseq == 0) { - dout("process_accept no existing connection, opening\n"); - goto accept; - } else { - dout("process_accept no existing connection, we reset\n"); - con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION; - goto reply; - } - - -accept: - /* accept this connection */ - con->connect_seq = peer_cseq + 1; - con->peer_global_seq = peer_gseq; - dout("process_accept %p cseq %d peer_gseq %d %s\n", con, - con->connect_seq, peer_gseq, replace ? "replace" : "new"); - - con->out_reply.tag = CEPH_MSGR_TAG_READY; - con->out_reply.global_seq = cpu_to_le32(get_global_seq(con->msgr, 0)); - con->out_reply.connect_seq = cpu_to_le32(peer_cseq + 1); - - retry = false; - prepare_read_tag(con); - - /* do this _after_ con is ready to go */ - if (replace) - __replace_connection(msgr, existing, con); - else - __register_connection(msgr, con); - put_connection(con); - -reply: - if (existing) - put_connection(existing); - prepare_write_accept_reply(con, retry); - - spin_unlock(&msgr->con_lock); - radix_tree_preload_end(); - - ceph_queue_con(con); - return 0; -} - /* * read (part of) an ack */ @@ -1766,17 +1382,6 @@ static int try_read(struct ceph_connection *con) more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, con->in_base_pos); - if (test_bit(ACCEPTING, &con->state)) { - dout("try_read accepting\n"); - ret = read_partial_accept(con); - if (ret <= 0) - goto done; - if (process_accept(con) < 0) { - ret = -1; - goto out; - } - goto more; - } if (test_bit(CONNECTING, &con->state)) { dout("try_read connecting\n"); ret = read_partial_connect(con); @@ -2011,42 +1616,6 @@ static void ceph_fault(struct ceph_connection *con) } -/* - * Handle an incoming connection. - */ -static void accept_work(struct work_struct *work) -{ - struct ceph_connection *newcon = NULL; - struct ceph_messenger *msgr = container_of(work, struct ceph_messenger, - awork); - - /* initialize the msgr connection */ - newcon = new_connection(msgr); - if (newcon == NULL) { - pr_err("ceph ENOMEM accepting new connection\n"); - return; - } - - set_bit(ACCEPTING, &newcon->state); - newcon->connect_seq = 1; - newcon->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */ - - if (ceph_tcp_accept(msgr->listen_sock, newcon) < 0) { - pr_err("ceph error accepting connection\n"); - put_connection(newcon); - return; - } - dout("accepted connection \n"); - - prepare_write_accept_hello(msgr, newcon); - add_connection_accepting(msgr, newcon); - - /* queue work explicitly; we may have missed the socket state - * change before setting the socket callbacks. */ - ceph_queue_con(newcon); -} - - /* * create a new messenger instance, creates listening socket @@ -2054,16 +1623,13 @@ static void accept_work(struct work_struct *work) struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) { struct ceph_messenger *msgr; - int ret = 0; msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) return ERR_PTR(-ENOMEM); - INIT_WORK(&msgr->awork, accept_work); spin_lock_init(&msgr->con_lock); INIT_LIST_HEAD(&msgr->con_all); - INIT_LIST_HEAD(&msgr->con_accepting); INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC); spin_lock_init(&msgr->global_seq_lock); @@ -2086,17 +1652,10 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) } msgr->inst.addr.ipaddr.sin_family = AF_INET; - /* create listening socket */ - ret = ceph_tcp_listen(msgr); - if (ret < 0) { - kfree(msgr); - return ERR_PTR(ret); - } if (myaddr) msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr; - dout("messenger %p listening on %u.%u.%u.%u:%u\n", msgr, - IPQUADPORT(msgr->inst.addr.ipaddr)); + dout("messenger_create %p\n", msgr); return msgr; } @@ -2106,11 +1665,6 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout("destroy %p\n", msgr); - /* stop listener */ - msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR); - sock_release(msgr->listen_sock); - cancel_work_sync(&msgr->awork); - /* kill off connections */ spin_lock(&msgr->con_lock); while (!list_empty(&msgr->con_all)) { diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 882728c51e8de..36e935d314681 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -71,12 +71,8 @@ struct ceph_messenger { struct ceph_entity_inst inst; /* my name+address */ - struct socket *listen_sock; /* listening socket */ - struct work_struct awork; /* accept work */ - spinlock_t con_lock; struct list_head con_all; /* all open connections */ - struct list_head con_accepting; /* accepting */ struct radix_tree_root con_tree; /* established */ struct page *zero_page; /* used in certain error cases */ @@ -131,7 +127,6 @@ struct ceph_msg_pos { #define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define LOSSYRX 1 /* peer may reset/drop messages */ #define CONNECTING 2 -#define ACCEPTING 3 #define WRITE_PENDING 4 /* we have data ready to send */ #define QUEUED 5 /* there is work queued on this connection */ #define BUSY 6 /* work is being done */ @@ -160,7 +155,7 @@ struct ceph_connection { atomic_t nref; struct list_head list_all; /* msgr->con_all */ - struct list_head list_bucket; /* msgr->con_tree or con_accepting */ + struct list_head list_bucket; struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ -- 2.39.5