* socket callback functions
*/
-/* listen socket received a connection */
-static void ceph_accept_ready(struct sock *sk, int count_unused)
-{
- struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
-
- dout("ceph_accept_ready messenger %p sk_state = %u\n",
- msgr, sk->sk_state);
- if (sk->sk_state == TCP_LISTEN)
- queue_work(ceph_msgr_wq, &msgr->awork);
-}
-
/* data available on socket, or listen socket received a connect */
static void ceph_data_ready(struct sock *sk, int count_unused)
{
/*
* set up socket callbacks
*/
-static void listen_sock_callbacks(struct socket *sock,
- struct ceph_messenger *msgr)
-{
- struct sock *sk = sock->sk;
- sk->sk_user_data = (void *)msgr;
- sk->sk_data_ready = ceph_accept_ready;
-}
-
static void set_sock_callbacks(struct socket *sock,
struct ceph_connection *con)
{
return sock;
}
-/*
- * set up listening socket
- */
-static int ceph_tcp_listen(struct ceph_messenger *msgr)
-{
- int ret;
- int optval = 1;
- struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
- int nlen;
- struct socket *sock;
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret)
- return ret;
- sock->sk->sk_allocation = GFP_NOFS;
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
- (char *)&optval, sizeof(optval));
- if (ret < 0) {
- pr_err("ceph failed to set SO_REUSEADDR: %d\n", ret);
- goto err;
- }
-
- ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
- sizeof(*myaddr));
- if (ret < 0) {
- pr_err("ceph failed to bind: %d\n", ret);
- goto err;
- }
-
- /* what port did we bind to? */
- nlen = sizeof(*myaddr);
- ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
- 0);
- if (ret < 0) {
- pr_err("ceph failed to getsockname: %d\n", ret);
- goto err;
- }
- pr_info("ceph listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr));
-
- /* we don't care too much if this works or not */
- sock->ops->listen(sock, CEPH_MSGR_BACKUP);
-
- /* ok! */
- msgr->listen_sock = sock;
- listen_sock_callbacks(sock, msgr);
- return 0;
-
-err:
- sock_release(sock);
- return ret;
-}
-
-/*
- * accept a connection
- */
-static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
-{
- struct socket *sock;
- int ret;
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret)
- return ret;
- con->sock = sock;
- sock->sk->sk_allocation = GFP_NOFS;
-
- ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
- if (ret < 0) {
- pr_err("ceph accept error: %d\n", ret);
- goto err;
- }
-
- sock->ops = lsock->ops;
- sock->type = lsock->type;
- set_sock_callbacks(sock, con);
- return ret;
-
-err:
- sock->ops->shutdown(sock, SHUT_RDWR);
- sock_release(sock);
- return ret;
-}
-
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
atomic_read(&con->nref), atomic_read(&con->nref) + 1);
atomic_inc(&con->nref);
- /* if were just ACCEPTING this connection, it is already on the
- * con_all and con_accepting lists. */
- if (test_and_clear_bit(ACCEPTING, &con->state)) {
- list_del_init(&con->list_bucket);
- put_connection(con);
- } else {
- list_add(&con->list_all, &msgr->con_all);
- }
+ list_add(&con->list_all, &msgr->con_all);
head = radix_tree_lookup(&msgr->con_tree, key);
if (head) {
return 0;
}
-/*
- * called under con_lock.
- */
-static void add_connection_accepting(struct ceph_messenger *msgr,
- struct ceph_connection *con)
-{
- dout("add_connection_accepting %p nref = %d -> %d\n", con,
- atomic_read(&con->nref), atomic_read(&con->nref) + 1);
- atomic_inc(&con->nref);
- spin_lock(&msgr->con_lock);
- list_add(&con->list_all, &msgr->con_all);
- spin_unlock(&msgr->con_lock);
-}
-
/*
* Remove connection from all list. Also, from con_tree, if it should
* have been there.
list_del_init(&con->list_bucket);
}
}
- if (test_and_clear_bit(ACCEPTING, &con->state))
- list_del_init(&con->list_bucket);
put_connection(con);
}
spin_unlock(&msgr->con_lock);
}
-/*
- * replace another connection
- * (old and new should be for the _same_ peer,
- * and thus in the same bucket in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr,
- struct ceph_connection *old,
- struct ceph_connection *new)
-{
- unsigned long key = hash_addr(&new->peer_addr);
- void **slot;
-
- dout("replace_connection %p with %p\n", old, new);
-
- /* replace in con_tree */
- slot = radix_tree_lookup_slot(&msgr->con_tree, key);
- if (*slot == &old->list_bucket)
- radix_tree_replace_slot(slot, &new->list_bucket);
- else
- BUG_ON(list_empty(&old->list_bucket));
- if (!list_empty(&old->list_bucket)) {
- /* replace old with new in bucket list */
- list_add(&new->list_bucket, &old->list_bucket);
- list_del_init(&old->list_bucket);
- }
-
- /* take old connections message queue */
- spin_lock(&old->out_queue_lock);
- if (!list_empty(&old->out_queue))
- list_splice_init(&new->out_queue, &old->out_queue);
- spin_unlock(&old->out_queue_lock);
-
- new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
- new->out_seq = old->out_seq;
- new->peer_name = old->peer_name;
-
- set_bit(CLOSED, &old->state);
- put_connection(old); /* dec reference count */
-
- clear_bit(ACCEPTING, &new->state);
-}
-
-
/*
set_bit(WRITE_PENDING, &con->state);
}
-/*
- * We accepted a connection and are saying hello.
- */
-static void prepare_write_accept_hello(struct ceph_messenger *msgr,
- struct ceph_connection *con)
-{
- int len = strlen(CEPH_BANNER);
-
- dout("prepare_write_accept_hello %p\n", con);
- con->out_kvec[0].iov_base = CEPH_BANNER;
- con->out_kvec[0].iov_len = len;
- con->out_kvec[1].iov_base = &msgr->inst.addr;
- con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
- con->out_kvec_left = 2;
- con->out_kvec_bytes = len + sizeof(msgr->inst.addr);
- con->out_kvec_cur = con->out_kvec;
- con->out_more = 0;
- set_bit(WRITE_PENDING, &con->state);
-}
-
-/*
- * Reply to a connect attempt, indicating whether the negotiation has
- * succeeded or must continue.
- */
-static void prepare_write_accept_reply(struct ceph_connection *con, bool retry)
-{
- dout("prepare_write_accept_reply %p\n", con);
- con->out_reply.flags = 0;
- if (test_bit(LOSSYTX, &con->state))
- con->out_reply.flags = CEPH_MSG_CONNECT_LOSSY;
-
- con->out_kvec[0].iov_base = &con->out_reply;
- con->out_kvec[0].iov_len = sizeof(con->out_reply);
- con->out_kvec_left = 1;
- con->out_kvec_bytes = sizeof(con->out_reply);
- con->out_kvec_cur = con->out_kvec;
- con->out_more = 0;
- set_bit(WRITE_PENDING, &con->state);
-
- if (retry)
- /* we'll re-read the connect request, sans the hello + addr */
- con->in_base_pos = strlen(CEPH_BANNER) +
- sizeof(con->msgr->inst.addr);
-}
-
-
/*
* write as much of pending kvecs to the socket as we can.
}
-/*
- * Read all or part of the accept-side handshake on a newly accepted
- * connection.
- */
-static int read_partial_accept(struct ceph_connection *con)
-{
- int ret;
- int to = 0;
-
- /* banner */
- ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
- if (ret <= 0)
- return ret;
- ret = read_partial(con, &to, sizeof(con->peer_addr), &con->peer_addr);
- if (ret <= 0)
- return ret;
- ret = read_partial(con, &to, sizeof(con->in_connect), &con->in_connect);
- if (ret <= 0)
- return ret;
- return 1;
-}
-
-/*
- * Call after a new connection's handshake has been read.
- */
-static int process_accept(struct ceph_connection *con)
-{
- struct ceph_connection *existing;
- struct ceph_messenger *msgr = con->msgr;
- u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
- u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
- bool retry = true;
- bool replace = false;
-
- dout("process_accept %p got gseq %d cseq %d\n", con,
- peer_gseq, peer_cseq);
-
- if (verify_hello(con) < 0)
- return -1;
-
- /* note flags */
- if (con->in_connect.flags & CEPH_MSG_CONNECT_LOSSY)
- set_bit(LOSSYRX, &con->state);
-
- /* do we have an existing connection for this peer? */
- if (radix_tree_preload(GFP_NOFS) < 0) {
- pr_err("ceph ENOMEM in process_accept\n");
- con->error_msg = "out of memory";
- return -1;
- }
-
- memset(&con->out_reply, 0, sizeof(con->out_reply));
-
- spin_lock(&msgr->con_lock);
- existing = __get_connection(msgr, &con->peer_addr);
- if (existing) {
- if (peer_gseq < existing->peer_global_seq) {
- /* out of order connection attempt */
- con->out_reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
- con->out_reply.global_seq =
- cpu_to_le32(con->peer_global_seq);
- goto reply;
- }
- if (test_bit(LOSSYTX, &existing->state)) {
- dout("process_accept %p replacing LOSSYTX %p\n",
- con, existing);
- replace = true;
- goto accept;
- }
- if (peer_cseq < existing->connect_seq) {
- if (peer_cseq == 0) {
- /* peer reset, then connected to us */
- reset_connection(existing);
- pr_info("reset on %s%d\n", ENTITY_NAME(con->peer_name));
- con->msgr->peer_reset(con->msgr->parent,
- &con->peer_addr,
- &con->peer_name);
- replace = true;
- goto accept;
- }
-
- /* old attempt or peer didn't get the READY */
- con->out_reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
- con->out_reply.connect_seq =
- cpu_to_le32(existing->connect_seq);
- goto reply;
- }
-
- if (peer_cseq == existing->connect_seq) {
- /* connection race */
- dout("process_accept connection race state = %lu\n",
- con->state);
- if (ceph_entity_addr_equal(&msgr->inst.addr,
- &con->peer_addr)) {
- /* incoming connection wins.. */
- replace = true;
- goto accept;
- }
-
- /* our existing outgoing connection wins, tell peer
- to wait for our outging connection to go through */
- con->out_reply.tag = CEPH_MSGR_TAG_WAIT;
- goto reply;
- }
-
- if (existing->connect_seq == 0 &&
- peer_cseq > existing->connect_seq) {
- /* we reset and already reconnecting */
- con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- goto reply;
- }
-
- WARN_ON(le32_to_cpu(con->in_connect.connect_seq) <=
- existing->connect_seq);
- WARN_ON(le32_to_cpu(con->in_connect.global_seq) <
- existing->peer_global_seq);
- if (existing->connect_seq == 0) {
- /* we reset, sending RESETSESSION */
- con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- goto reply;
- }
-
- /* reconnect, replace connection */
- replace = true;
- goto accept;
- }
-
- if (peer_cseq == 0) {
- dout("process_accept no existing connection, opening\n");
- goto accept;
- } else {
- dout("process_accept no existing connection, we reset\n");
- con->out_reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- goto reply;
- }
-
-
-accept:
- /* accept this connection */
- con->connect_seq = peer_cseq + 1;
- con->peer_global_seq = peer_gseq;
- dout("process_accept %p cseq %d peer_gseq %d %s\n", con,
- con->connect_seq, peer_gseq, replace ? "replace" : "new");
-
- con->out_reply.tag = CEPH_MSGR_TAG_READY;
- con->out_reply.global_seq = cpu_to_le32(get_global_seq(con->msgr, 0));
- con->out_reply.connect_seq = cpu_to_le32(peer_cseq + 1);
-
- retry = false;
- prepare_read_tag(con);
-
- /* do this _after_ con is ready to go */
- if (replace)
- __replace_connection(msgr, existing, con);
- else
- __register_connection(msgr, con);
- put_connection(con);
-
-reply:
- if (existing)
- put_connection(existing);
- prepare_write_accept_reply(con, retry);
-
- spin_unlock(&msgr->con_lock);
- radix_tree_preload_end();
-
- ceph_queue_con(con);
- return 0;
-}
-
/*
* read (part of) an ack
*/
more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
- if (test_bit(ACCEPTING, &con->state)) {
- dout("try_read accepting\n");
- ret = read_partial_accept(con);
- if (ret <= 0)
- goto done;
- if (process_accept(con) < 0) {
- ret = -1;
- goto out;
- }
- goto more;
- }
if (test_bit(CONNECTING, &con->state)) {
dout("try_read connecting\n");
ret = read_partial_connect(con);
}
-/*
- * Handle an incoming connection.
- */
-static void accept_work(struct work_struct *work)
-{
- struct ceph_connection *newcon = NULL;
- struct ceph_messenger *msgr = container_of(work, struct ceph_messenger,
- awork);
-
- /* initialize the msgr connection */
- newcon = new_connection(msgr);
- if (newcon == NULL) {
- pr_err("ceph ENOMEM accepting new connection\n");
- return;
- }
-
- set_bit(ACCEPTING, &newcon->state);
- newcon->connect_seq = 1;
- newcon->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */
-
- if (ceph_tcp_accept(msgr->listen_sock, newcon) < 0) {
- pr_err("ceph error accepting connection\n");
- put_connection(newcon);
- return;
- }
- dout("accepted connection \n");
-
- prepare_write_accept_hello(msgr, newcon);
- add_connection_accepting(msgr, newcon);
-
- /* queue work explicitly; we may have missed the socket state
- * change before setting the socket callbacks. */
- ceph_queue_con(newcon);
-}
-
-
/*
* create a new messenger instance, creates listening socket
struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
{
struct ceph_messenger *msgr;
- int ret = 0;
msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
if (msgr == NULL)
return ERR_PTR(-ENOMEM);
- INIT_WORK(&msgr->awork, accept_work);
spin_lock_init(&msgr->con_lock);
INIT_LIST_HEAD(&msgr->con_all);
- INIT_LIST_HEAD(&msgr->con_accepting);
INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
spin_lock_init(&msgr->global_seq_lock);
}
msgr->inst.addr.ipaddr.sin_family = AF_INET;
- /* create listening socket */
- ret = ceph_tcp_listen(msgr);
- if (ret < 0) {
- kfree(msgr);
- return ERR_PTR(ret);
- }
if (myaddr)
msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
- dout("messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
- IPQUADPORT(msgr->inst.addr.ipaddr));
+ dout("messenger_create %p\n", msgr);
return msgr;
}
dout("destroy %p\n", msgr);
- /* stop listener */
- msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR);
- sock_release(msgr->listen_sock);
- cancel_work_sync(&msgr->awork);
-
/* kill off connections */
spin_lock(&msgr->con_lock);
while (!list_empty(&msgr->con_all)) {