]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: do not accept connections in messager (client only!)
authorSage Weil <sage@newdream.net>
Tue, 25 Aug 2009 23:23:35 +0000 (16:23 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 20:09:09 +0000 (13:09 -0700)
This rips out a lot of code, yay!

src/kernel/messenger.c
src/kernel/messenger.h

index e5400dcc4e0b2c009af40e82fab89ebf67d1db16..8d618f23ba9410551157d0fe51e0caf6e06491e5 100644 (file)
@@ -68,17 +68,6 @@ void ceph_msgr_exit(void)
  * socket callback functions
  */
 
-/* listen socket received a connection */
-static void ceph_accept_ready(struct sock *sk, int count_unused)
-{
-       struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
-
-       dout("ceph_accept_ready messenger %p sk_state = %u\n",
-            msgr, sk->sk_state);
-       if (sk->sk_state == TCP_LISTEN)
-               queue_work(ceph_msgr_wq, &msgr->awork);
-}
-
 /* data available on socket, or listen socket received a connect */
 static void ceph_data_ready(struct sock *sk, int count_unused)
 {
@@ -143,14 +132,6 @@ static void ceph_state_change(struct sock *sk)
 /*
  * set up socket callbacks
  */
-static void listen_sock_callbacks(struct socket *sock,
-                                 struct ceph_messenger *msgr)
-{
-       struct sock *sk = sock->sk;
-       sk->sk_user_data = (void *)msgr;
-       sk->sk_data_ready = ceph_accept_ready;
-}
-
 static void set_sock_callbacks(struct socket *sock,
                               struct ceph_connection *con)
 {
@@ -208,89 +189,6 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
        return sock;
 }
 
-/*
- * set up listening socket
- */
-static int ceph_tcp_listen(struct ceph_messenger *msgr)
-{
-       int ret;
-       int optval = 1;
-       struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
-       int nlen;
-       struct socket *sock;
-
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret)
-               return ret;
-       sock->sk->sk_allocation = GFP_NOFS;
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&optval, sizeof(optval));
-       if (ret < 0) {
-               pr_err("ceph failed to set SO_REUSEADDR: %d\n", ret);
-               goto err;
-       }
-
-       ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
-                             sizeof(*myaddr));
-       if (ret < 0) {
-               pr_err("ceph failed to bind: %d\n", ret);
-               goto err;
-       }
-
-       /* what port did we bind to? */
-       nlen = sizeof(*myaddr);
-       ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
-                                0);
-       if (ret < 0) {
-               pr_err("ceph failed to getsockname: %d\n", ret);
-               goto err;
-       }
-       pr_info("ceph listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr));
-
-       /* we don't care too much if this works or not */
-       sock->ops->listen(sock, CEPH_MSGR_BACKUP);
-
-       /* ok! */
-       msgr->listen_sock = sock;
-       listen_sock_callbacks(sock, msgr);
-       return 0;
-
-err:
-       sock_release(sock);
-       return ret;
-}
-
-/*
- * accept a connection
- */
-static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
-{
-       struct socket *sock;
-       int ret;
-
-       ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
-       if (ret)
-               return ret;
-       con->sock = sock;
-       sock->sk->sk_allocation = GFP_NOFS;
-
-       ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
-       if (ret < 0) {
-               pr_err("ceph accept error: %d\n", ret);
-               goto err;
-       }
-
-       sock->ops = lsock->ops;
-       sock->type = lsock->type;
-       set_sock_callbacks(sock, con);
-       return ret;
-
-err:
-       sock->ops->shutdown(sock, SHUT_RDWR);
-       sock_release(sock);
-       return ret;
-}
-
 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
@@ -438,14 +336,7 @@ static int __register_connection(struct ceph_messenger *msgr,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1);
        atomic_inc(&con->nref);
 
-       /* if were just ACCEPTING this connection, it is already on the
-        * con_all and con_accepting lists. */
-       if (test_and_clear_bit(ACCEPTING, &con->state)) {
-               list_del_init(&con->list_bucket);
-               put_connection(con);
-       } else {
-               list_add(&con->list_all, &msgr->con_all);
-       }
+       list_add(&con->list_all, &msgr->con_all);
 
        head = radix_tree_lookup(&msgr->con_tree, key);
        if (head) {
@@ -467,20 +358,6 @@ static int __register_connection(struct ceph_messenger *msgr,
        return 0;
 }
 
-/*
- * called under con_lock.
- */
-static void add_connection_accepting(struct ceph_messenger *msgr,
-                                    struct ceph_connection *con)
-{
-       dout("add_connection_accepting %p nref = %d -> %d\n", con,
-            atomic_read(&con->nref), atomic_read(&con->nref) + 1);
-       atomic_inc(&con->nref);
-       spin_lock(&msgr->con_lock);
-       list_add(&con->list_all, &msgr->con_all);
-       spin_unlock(&msgr->con_lock);
-}
-
 /*
  * Remove connection from all list.  Also, from con_tree, if it should
  * have been there.
@@ -523,8 +400,6 @@ static void __remove_connection(struct ceph_messenger *msgr,
                        list_del_init(&con->list_bucket);
                }
        }
-       if (test_and_clear_bit(ACCEPTING, &con->state))
-               list_del_init(&con->list_bucket);
        put_connection(con);
 }
 
@@ -536,49 +411,6 @@ static void remove_connection(struct ceph_messenger *msgr,
        spin_unlock(&msgr->con_lock);
 }
 
-/*
- * replace another connection
- *  (old and new should be for the _same_ peer,
- *   and thus in the same bucket in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr,
-                                struct ceph_connection *old,
-                                struct ceph_connection *new)
-{
-       unsigned long key = hash_addr(&new->peer_addr);
-       void **slot;
-
-       dout("replace_connection %p with %p\n", old, new);
-
-       /* replace in con_tree */
-       slot = radix_tree_lookup_slot(&msgr->con_tree, key);
-       if (*slot == &old->list_bucket)
-               radix_tree_replace_slot(slot, &new->list_bucket);
-       else
-               BUG_ON(list_empty(&old->list_bucket));
-       if (!list_empty(&old->list_bucket)) {
-               /* replace old with new in bucket list */
-               list_add(&new->list_bucket, &old->list_bucket);
-               list_del_init(&old->list_bucket);
-       }
-
-       /* take old connections message queue */
-       spin_lock(&old->out_queue_lock);
-       if (!list_empty(&old->out_queue))
-               list_splice_init(&new->out_queue, &old->out_queue);
-       spin_unlock(&old->out_queue_lock);
-
-       new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
-       new->out_seq = old->out_seq;
-       new->peer_name = old->peer_name;
-
-       set_bit(CLOSED, &old->state);
-       put_connection(old); /* dec reference count */
-
-       clear_bit(ACCEPTING, &new->state);
-}
-
-
 
 
 /*
@@ -773,52 +605,6 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr,
        set_bit(WRITE_PENDING, &con->state);
 }
 
-/*
- * We accepted a connection and are saying hello.
- */
-static void prepare_write_accept_hello(struct ceph_messenger *msgr,
-                                      struct ceph_connection *con)
-{
-       int len = strlen(CEPH_BANNER);
-
-       dout("prepare_write_accept_hello %p\n", con);
-       con->out_kvec[0].iov_base = CEPH_BANNER;
-       con->out_kvec[0].iov_len = len;
-       con->out_kvec[1].iov_base = &msgr->inst.addr;
-       con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = len + sizeof(msgr->inst.addr);
-       con->out_kvec_cur = con->out_kvec;
-       con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->state);
-}
-
-/*
- * Reply to a connect attempt, indicating whether the negotiation has
- * succeeded or must continue.
- */
-static void prepare_write_accept_reply(struct ceph_connection *con, bool retry)
-{
-       dout("prepare_write_accept_reply %p\n", con);
-       con->out_reply.flags = 0;
-       if (test_bit(LOSSYTX, &con->state))
-               con->out_reply.flags = CEPH_MSG_CONNECT_LOSSY;
-
-       con->out_kvec[0].iov_base = &con->out_reply;
-       con->out_kvec[0].iov_len = sizeof(con->out_reply);
-       con->out_kvec_left = 1;
-       con->out_kvec_bytes = sizeof(con->out_reply);
-       con->out_kvec_cur = con->out_kvec;
-       con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->state);
-
-       if (retry)
-               /* we'll re-read the connect request, sans the hello + addr */
-               con->in_base_pos = strlen(CEPH_BANNER) +
-                       sizeof(con->msgr->inst.addr);
-}
-
-
 
 /*
  * write as much of pending kvecs to the socket as we can.
@@ -1182,176 +968,6 @@ static int process_connect(struct ceph_connection *con)
 }
 
 
-/*
- * Read all or part of the accept-side handshake on a newly accepted
- * connection.
- */
-static int read_partial_accept(struct ceph_connection *con)
-{
-       int ret;
-       int to = 0;
-
-       /* banner */
-       ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
-       if (ret <= 0)
-               return ret;
-       ret = read_partial(con, &to, sizeof(con->peer_addr), &con->peer_addr);
-       if (ret <= 0)
-               return ret;
-       ret = read_partial(con, &to, sizeof(con->in_connect), &con->in_connect);
-       if (ret <= 0)
-               return ret;
-       return 1;
-}
-
-/*
- * Call after a new connection's handshake has been read.
- */
-static int process_accept(struct ceph_connection *con)
-{
-       struct ceph_connection *existing;
-       struct ceph_messenger *msgr = con->msgr;
-       u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
-       u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
-       bool retry = true;
-       bool replace = false;
-
-       dout("process_accept %p got gseq %d cseq %d\n", con,
-            peer_gseq, peer_cseq);
-
-       if (verify_hello(con) < 0)
-               return -1;
-
-       /* note flags */
-       if (con->in_connect.flags & CEPH_MSG_CONNECT_LOSSY)
-               set_bit(LOSSYRX, &con->state);
-
-       /* do we have an existing connection for this peer? */
-       if (radix_tree_preload(GFP_NOFS) < 0) {
-               pr_err("ceph ENOMEM in process_accept\n");
-               con->error_msg = "out of memory";
-               return -1;
-       }
-
-       memset(&con->out_reply, 0, sizeof(con->out_reply));
-
-       spin_lock(&msgr->con_lock);
-       existing = __get_connection(msgr, &con->peer_addr);
-       if (existing) {
-               if (peer_gseq < existing->peer_global_seq) {
-                       /* out of order connection attempt */
-                       con->out_reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
-                       con->out_reply.global_seq =
-                               cpu_to_le32(con->peer_global_seq);
-                       goto reply;
-               }
-               if (test_bit(LOSSYTX, &existing->state)) {
-                       dout("process_accept %p replacing LOSSYTX %p\n",
-                            con, existing);
-                       replace = true;
-                       goto accept;
-               }
-               if (peer_cseq < existing->connect_seq) {
-                       if (peer_cseq == 0) {
-                               /* peer reset, then connected to us */
-                               reset_connection(existing);
-                               pr_info("reset on %s%d\n", ENTITY_NAME(con->peer_name));
-                               con->msgr->peer_reset(con->msgr->parent,
-                                                     &con->peer_addr,
-                                                     &con->peer_name);
-                               replace = true;
-                               goto accept;
-                       }
-
-                       /* old attempt or peer didn't get the READY */
-                       con->out_reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
-                       con->out_reply.connect_seq =
-                               cpu_to_le32(existing->connect_seq);
-                       goto reply;
-               }
-
-               if (peer_cseq == existing->connect_seq) {
-                       /* connection race */
-                       dout("process_accept connection race state = %lu\n",
-                            con->state);
-                       if (ceph_entity_addr_equal(&msgr->inst.addr,
-                                                  &con->peer_addr)) {
-                               /* incoming connection wins.. */
-                               replace = true;
-                               goto accept;
-                       }
-
-                       /* our existing outgoing connection wins, tell peer
-                          to wait for our outging connection to go through */
-                       con->out_reply.tag = CEPH_MSGR_TAG_WAIT;
-                       goto reply;
-               }
-
-               if (existing->connect_seq == 0 &&
-                   peer_cseq > existing->connect_seq) {
-                       /* we reset and already reconnecting */
-                       con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-                       goto reply;
-               }
-
-               WARN_ON(le32_to_cpu(con->in_connect.connect_seq) <=
-                                               existing->connect_seq);
-               WARN_ON(le32_to_cpu(con->in_connect.global_seq) <
-                                               existing->peer_global_seq);
-               if (existing->connect_seq == 0) {
-                       /* we reset, sending RESETSESSION */
-                       con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-                       goto reply;
-               }
-
-               /* reconnect, replace connection */
-               replace = true;
-               goto accept;
-       }
-
-       if (peer_cseq == 0) {
-               dout("process_accept no existing connection, opening\n");
-               goto accept;
-       } else {
-               dout("process_accept no existing connection, we reset\n");
-               con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
-               goto reply;
-       }
-
-
-accept:
-       /* accept this connection */
-       con->connect_seq = peer_cseq + 1;
-       con->peer_global_seq = peer_gseq;
-       dout("process_accept %p cseq %d peer_gseq %d %s\n", con,
-            con->connect_seq, peer_gseq, replace ? "replace" : "new");
-
-       con->out_reply.tag = CEPH_MSGR_TAG_READY;
-       con->out_reply.global_seq = cpu_to_le32(get_global_seq(con->msgr, 0));
-       con->out_reply.connect_seq = cpu_to_le32(peer_cseq + 1);
-
-       retry = false;
-       prepare_read_tag(con);
-
-       /* do this _after_ con is ready to go */
-       if (replace)
-               __replace_connection(msgr, existing, con);
-       else
-               __register_connection(msgr, con);
-       put_connection(con);
-
-reply:
-       if (existing)
-               put_connection(existing);
-       prepare_write_accept_reply(con, retry);
-
-       spin_unlock(&msgr->con_lock);
-       radix_tree_preload_end();
-
-       ceph_queue_con(con);
-       return 0;
-}
-
 /*
  * read (part of) an ack
  */
@@ -1766,17 +1382,6 @@ static int try_read(struct ceph_connection *con)
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
-       if (test_bit(ACCEPTING, &con->state)) {
-               dout("try_read accepting\n");
-               ret = read_partial_accept(con);
-               if (ret <= 0)
-                       goto done;
-               if (process_accept(con) < 0) {
-                       ret = -1;
-                       goto out;
-               }
-               goto more;
-       }
        if (test_bit(CONNECTING, &con->state)) {
                dout("try_read connecting\n");
                ret = read_partial_connect(con);
@@ -2011,42 +1616,6 @@ static void ceph_fault(struct ceph_connection *con)
 }
 
 
-/*
- * Handle an incoming connection.
- */
-static void accept_work(struct work_struct *work)
-{
-       struct ceph_connection *newcon = NULL;
-       struct ceph_messenger *msgr = container_of(work, struct ceph_messenger,
-                                                  awork);
-
-       /* initialize the msgr connection */
-       newcon = new_connection(msgr);
-       if (newcon == NULL) {
-               pr_err("ceph ENOMEM accepting new connection\n");
-               return;
-       }
-
-       set_bit(ACCEPTING, &newcon->state);
-       newcon->connect_seq = 1;
-       newcon->in_tag = CEPH_MSGR_TAG_READY;  /* eventually, hopefully */
-
-       if (ceph_tcp_accept(msgr->listen_sock, newcon) < 0) {
-               pr_err("ceph error accepting connection\n");
-               put_connection(newcon);
-               return;
-       }
-       dout("accepted connection \n");
-
-       prepare_write_accept_hello(msgr, newcon);
-       add_connection_accepting(msgr, newcon);
-
-       /* queue work explicitly; we may have missed the socket state
-        * change before setting the socket callbacks. */
-       ceph_queue_con(newcon);
-}
-
-
 
 /*
  * create a new messenger instance, creates listening socket
@@ -2054,16 +1623,13 @@ static void accept_work(struct work_struct *work)
 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 {
        struct ceph_messenger *msgr;
-       int ret = 0;
 
        msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
        if (msgr == NULL)
                return ERR_PTR(-ENOMEM);
 
-       INIT_WORK(&msgr->awork, accept_work);
        spin_lock_init(&msgr->con_lock);
        INIT_LIST_HEAD(&msgr->con_all);
-       INIT_LIST_HEAD(&msgr->con_accepting);
        INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
        spin_lock_init(&msgr->global_seq_lock);
 
@@ -2086,17 +1652,10 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
        }
        msgr->inst.addr.ipaddr.sin_family = AF_INET;
 
-       /* create listening socket */
-       ret = ceph_tcp_listen(msgr);
-       if (ret < 0) {
-               kfree(msgr);
-               return ERR_PTR(ret);
-       }
        if (myaddr)
                msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
 
-       dout("messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
-            IPQUADPORT(msgr->inst.addr.ipaddr));
+       dout("messenger_create %p\n", msgr);
        return msgr;
 }
 
@@ -2106,11 +1665,6 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
 
        dout("destroy %p\n", msgr);
 
-       /* stop listener */
-       msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR);
-       sock_release(msgr->listen_sock);
-       cancel_work_sync(&msgr->awork);
-
        /* kill off connections */
        spin_lock(&msgr->con_lock);
        while (!list_empty(&msgr->con_all)) {
index 882728c51e8de3f0b4dc0d2bdcbdd434e67e7e99..36e935d31468127e26129451dd31a881e1a52c2f 100644 (file)
@@ -71,12 +71,8 @@ struct ceph_messenger {
 
        struct ceph_entity_inst inst;    /* my name+address */
 
-       struct socket *listen_sock;      /* listening socket */
-       struct work_struct awork;        /* accept work */
-
        spinlock_t con_lock;
        struct list_head con_all;        /* all open connections */
-       struct list_head con_accepting;  /*  accepting */
        struct radix_tree_root con_tree; /*  established */
 
        struct page *zero_page;          /* used in certain error cases */
@@ -131,7 +127,6 @@ struct ceph_msg_pos {
 #define LOSSYTX         0  /* we can close channel or drop messages on errors */
 #define LOSSYRX         1  /* peer may reset/drop messages */
 #define CONNECTING     2
-#define ACCEPTING      3
 #define WRITE_PENDING  4  /* we have data ready to send */
 #define QUEUED          5  /* there is work queued on this connection */
 #define BUSY            6  /* work is being done */
@@ -160,7 +155,7 @@ struct ceph_connection {
        atomic_t nref;
 
        struct list_head list_all;     /* msgr->con_all */
-       struct list_head list_bucket;  /* msgr->con_tree or con_accepting */
+       struct list_head list_bucket;
 
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */