From: Sage Weil Date: Sat, 18 Oct 2008 03:47:07 +0000 (-0700) Subject: kclient: messenger.c cleanup, reorganization, comments X-Git-Tag: v0.5~267 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5424a3375a1c95cdabb6db8d90d04506d16f15f6;p=ceph.git kclient: messenger.c cleanup, reorganization, comments --- diff --git a/src/TODO b/src/TODO index c31a0b2841f3..f7d5ed36cb51 100644 --- a/src/TODO +++ b/src/TODO @@ -47,7 +47,6 @@ snaps on osd - efficient recovery of clones using the clone diff info kernel client -- fix osd client timeout - make osd retry writes if failure after ack.. - clean up cap flush on session close - ACLs diff --git a/src/kernel/decode.h b/src/kernel/decode.h index ef21b69cce38..26c2cd4b0a2c 100644 --- a/src/kernel/decode.h +++ b/src/kernel/decode.h @@ -37,6 +37,7 @@ (*p)++; \ } while (0) +/* decode into an __le## */ #define ceph_decode_64_le(p, v) \ do { \ v = *(__le64*)*(p); \ @@ -59,6 +60,7 @@ *(p) += n; \ } while (0) +/* bounds check too */ #define ceph_decode_64_safe(p, end, v, bad) \ do { \ ceph_decode_need(p, end, sizeof(__u64), bad); \ @@ -99,7 +101,6 @@ /* * encoders */ - #define ceph_encode_64(p, v) \ do { \ *(__le64*)*(p) = cpu_to_le64((v)); \ @@ -124,7 +125,6 @@ /* * filepath, string encoders */ - static __inline__ void ceph_encode_filepath(void **p, void *end, __u64 ino, const char *path) { diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 9fab1017f305..21e97dd8626e 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -27,12 +27,14 @@ static char tag_wait = CEPH_MSGR_TAG_WAIT; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; + +static void ceph_queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); -static void try_accept(struct work_struct *); +static void ceph_fault(struct ceph_connection *con); /* - * workqueue + * work queue for all reading and writing to/from the socket. */ struct workqueue_struct *ceph_msgr_wq; @@ -58,7 +60,7 @@ void ceph_msgr_exit(void) * socket callback functions */ -/* listen socket received a connect */ +/* listen socket received a connection */ static void ceph_accept_ready(struct sock *sk, int count_unused) { struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data; @@ -69,37 +71,36 @@ static void ceph_accept_ready(struct sock *sk, int count_unused) queue_work(ceph_msgr_wq, &msgr->awork); } -/* Data available on socket or listen socket received a connect */ +/* data available on socket, or listen socket received a connect */ static void ceph_data_ready(struct sock *sk, int count_unused) { struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; if (sk->sk_state != TCP_CLOSE_WAIT) { - dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n", + dout(30, "ceph_data_ready on %p state = %lu, queueing work\n", con, con->state); ceph_queue_con(con); } } -/* socket has bufferspace for writing */ +/* socket has buffer space for writing */ static void ceph_write_space(struct sock *sk) { struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data; - dout(30, "ceph_write_space %p state = %lu\n", con, con->state); - - /* only queue to workqueue if a WRITE is pending */ + /* only queue to workqueue if there is data we want to write. */ if (test_bit(WRITE_PENDING, &con->state)) { - dout(30, "ceph_write_space %p queuing write work\n", con); + dout(30, "ceph_write_space %p queueing write work\n", con); ceph_queue_con(con); - } + } else + dout(30, "ceph_write_space %p nothing to write\n", con); - /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */ + /* since we have our own write_space, clear the SOCK_NOSPACE flag */ clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); } -/* sockets state has change */ +/* socket's state has changed */ static void ceph_state_change(struct sock *sk) { struct ceph_connection *con = @@ -125,12 +126,14 @@ static void ceph_state_change(struct sock *sk) break; case TCP_ESTABLISHED: dout(30, "ceph_state_change TCP_ESTABLISHED\n"); - ceph_write_space(sk); + ceph_queue_con(con); break; } } -/* make a listening socket active by setting up the data ready call back */ +/* + * set up socket callbacks + */ static void listen_sock_callbacks(struct socket *sock, struct ceph_messenger *msgr) { @@ -139,7 +142,6 @@ static void listen_sock_callbacks(struct socket *sock, sk->sk_data_ready = ceph_accept_ready; } -/* make a socket active by setting up the call back functions */ static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con) { @@ -160,35 +162,36 @@ static void set_sock_callbacks(struct socket *sock, */ static struct socket *ceph_tcp_connect(struct ceph_connection *con) { - int ret; struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; struct socket *sock; + int ret; + BUG_ON(con->sock); ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ERR_PTR(ret); - con->sock = sock; sock->sk->sk_allocation = GFP_NOFS; set_sock_callbacks(sock, con); - dout (20, "connect %u.%u.%u.%u:%u\n", - IPQUADPORT(*(struct sockaddr_in *)paddr)); + dout(20, "connect %u.%u.%u.%u:%u\n", + IPQUADPORT(*(struct sockaddr_in *)paddr)); ret = sock->ops->connect(sock, paddr, sizeof(struct sockaddr_in), O_NONBLOCK); if (ret == -EINPROGRESS) { - dout(20, "connect EINPROGRESS sk_state = = %u\n", + dout(20, "connect %u.%u.%u.%u:%u EINPROGRESS sk_state = %u\n", + IPQUADPORT(*(struct sockaddr_in *)paddr), sock->sk->sk_state); ret = 0; } if (ret < 0) { - /* TBD check for fatal errors, retry if not fatal.. */ - derr(1, "connect %u.%u.%u.%u:%u error: %d\n", + derr(1, "connect %u.%u.%u.%u:%u error %d\n", IPQUADPORT(*(struct sockaddr_in *)paddr), ret); sock_release(sock); con->sock = NULL; + con->error_msg = "connect error"; } if (ret < 0) @@ -197,7 +200,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) } /* - * setup listening socket + * set up listening socket */ static int ceph_tcp_listen(struct ceph_messenger *msgr) { @@ -210,13 +213,11 @@ static int ceph_tcp_listen(struct ceph_messenger *msgr) ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; - sock->sk->sk_allocation = GFP_NOFS; - ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&optval, sizeof(optval)); if (ret < 0) { - derr(0, "Failed to set SO_REUSEADDR: %d\n", ret); + derr(0, "failed to set SO_REUSEADDR: %d\n", ret); goto err; } @@ -235,26 +236,15 @@ static int ceph_tcp_listen(struct ceph_messenger *msgr) derr(0, "failed to getsockname: %d\n", ret); goto err; } - dout(10, "listen on port %d\n", ntohs(myaddr->sin_port)); - - ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&optval, sizeof(optval)); - if (ret < 0) { - derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret); - goto err; - } + dout(0, "listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr)); - /* TBD: probaby want to tune the backlog queue .. */ - ret = sock->ops->listen(sock, CEPH_MSGR_BACKUP); - if (ret < 0) { - derr(0, "kernel_listen error: %d\n", ret); - goto err; - } + /* we don't care too much if this works or not */ + sock->ops->listen(sock, CEPH_MSGR_BACKUP); /* ok! */ msgr->listen_sock = sock; listen_sock_callbacks(sock, msgr); - return ret; + return 0; err: sock_release(sock); @@ -262,20 +252,17 @@ err: } /* - * accept a connection + * accept a connection */ static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con) { - int ret; - struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; - int len; struct socket *sock; + int ret; ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; con->sock = sock; - sock->sk->sk_allocation = GFP_NOFS; ret = lsock->ops->accept(lsock, sock, O_NONBLOCK); @@ -286,15 +273,7 @@ static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con) sock->ops = lsock->ops; sock->type = lsock->type; - ret = sock->ops->getname(sock, paddr, &len, 2); - if (ret < 0) { - derr(0, "getname error: %d\n", ret); - goto err; - } - - /* setup callbacks */ set_sock_callbacks(sock, con); - return ret; err: @@ -303,85 +282,74 @@ err: return ret; } -/* - * receive a message this may return after partial send - */ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; - struct msghdr msg = {.msg_flags = 0}; - int rlen = 0; /* length read */ + struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; - rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); - return(rlen); + return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); } - /* - * Send a message this may return after partial send + * write something. @more is true if caller will be sending more data + * shortly. */ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, size_t kvlen, size_t len, int more) { - struct msghdr msg = {.msg_flags = 0}; - int rlen = 0; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; if (more) msg.msg_flags |= MSG_MORE; else msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ - rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); - - return(rlen); + return kernel_sendmsg(sock, &msg, iov, kvlen, len); } - /* * create a new connection. */ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) { struct ceph_connection *con; + con = kzalloc(sizeof(struct ceph_connection), GFP_NOFS); if (con == NULL) return NULL; - con->msgr = msgr; atomic_set(&con->nref, 1); - INIT_LIST_HEAD(&con->list_all); INIT_LIST_HEAD(&con->list_bucket); - spin_lock_init(&con->out_queue_lock); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); - return con; } /* - * the radix_tree has an unsigned long key and void * value. since - * ceph_entity_addr is bigger than that, we use a trivial hash key, and - * point to a list_head in ceph_connection, as you would with a hash - * table. in the rare event that the trivial hash collides, we just - * traverse the (short) list. + * The con_tree radix_tree has an unsigned long key and void * value. + * Since ceph_entity_addr is bigger than that, we use a trivial hash + * key, and point to a list_head in ceph_connection, as you would with + * a hash table. If the trivial hash collides, we just traverse the + * (hopefully short) list until we find what we want. */ static unsigned long hash_addr(struct ceph_entity_addr *addr) { unsigned long key; + key = *(__u32 *)&addr->ipaddr.sin_addr.s_addr; key ^= *(__u16 *)&addr->ipaddr.sin_port; return key; } /* - * get an existing connection, if any, for given addr + * Get an existing connection, if any, for given addr. Note that we + * may need to traverse the list_bucket list, which has to "head." + * + * called under con_lock. */ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) @@ -390,7 +358,6 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct list_head *head, *p; unsigned long key = hash_addr(addr); - /* existing? */ head = radix_tree_lookup(&msgr->con_tree, key); if (head == NULL) return NULL; @@ -402,7 +369,6 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) goto yes; } - return NULL; yes: @@ -414,18 +380,18 @@ yes: /* - * close connection socket + * Shutdown/close the socket for the given connection. */ static int con_close_socket(struct ceph_connection *con) { int rc; + dout(10, "con_close_socket on %p sock %p\n", con, con->sock); if (!con->sock) return 0; rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); con->sock = NULL; - return rc; } @@ -447,20 +413,23 @@ static void put_connection(struct ceph_connection *con) } /* - * add to connections tree + * add a connection to the con_tree. + * + * called under con_lock. */ static int __register_connection(struct ceph_messenger *msgr, - struct ceph_connection *con) + struct ceph_connection *con) { struct list_head *head; unsigned long key = hash_addr(&con->peer_addr); int rc = 0; - /* inc ref count */ + dout(20, "register_connection %p %d -> %d\n", con, + atomic_read(&con->nref), atomic_read(&con->nref) + 1); atomic_inc(&con->nref); - dout(20, "add_connection %p %d -> %d\n", con, - atomic_read(&con->nref) - 1, atomic_read(&con->nref)); + /* if were just ACCEPTING this connection, it is already on the + * con_all and con_accepting lists. */ if (test_and_clear_bit(ACCEPTING, &con->state)) { list_del_init(&con->list_bucket); put_connection(con); @@ -469,25 +438,27 @@ static int __register_connection(struct ceph_messenger *msgr, head = radix_tree_lookup(&msgr->con_tree, key); if (head) { - dout(20, "add_connection %p in existing bucket %lu head %p\n", + dout(20, "register_connection %p in old bucket %lu head %p\n", con, key, head); list_add(&con->list_bucket, head); } else { - dout(20, "add_connection %p in new bucket %lu head %p\n", con, - key, &con->list_bucket); - INIT_LIST_HEAD(&con->list_bucket); /* empty */ + dout(20, "register_connection %p in new bucket %lu head %p\n", + con, key, &con->list_bucket); + INIT_LIST_HEAD(&con->list_bucket); /* empty */ rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket); - if (rc < 0) { list_del(&con->list_all); + put_connection(con); return rc; } } set_bit(REGISTERED, &con->state); - return 0; } +/* + * called under con_lock. + */ static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con) { @@ -500,8 +471,10 @@ static void add_connection_accepting(struct ceph_messenger *msgr, } /* - * remove connection from all list. - * also, from con_tree radix tree, if it should have been there + * Remove connection from all list. Also, from con_tree, if it should + * have been there. + * + * called under con_lock. */ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con) @@ -516,14 +489,15 @@ static void __remove_connection(struct ceph_messenger *msgr, } list_del_init(&con->list_all); if (test_bit(REGISTERED, &con->state)) { - /* remove from con_tree too */ key = hash_addr(&con->peer_addr); if (list_empty(&con->list_bucket)) { - /* last one */ + /* last one in this bucket */ dout(20, "__remove_connection %p and bucket %lu\n", con, key); radix_tree_delete(&msgr->con_tree, key); } else { + /* if we share this bucket, and the radix tree points + * to us, adjust it to point to the next guy. */ slot = radix_tree_lookup_slot(&msgr->con_tree, key); val = radix_tree_deref_slot(slot); dout(20, "__remove_connection %p from bucket %lu " @@ -538,7 +512,7 @@ static void __remove_connection(struct ceph_messenger *msgr, list_del_init(&con->list_bucket); } } - if (test_bit(ACCEPTING, &con->state)) + if (test_and_clear_bit(ACCEPTING, &con->state)) list_del_init(&con->list_bucket); put_connection(con); } @@ -551,248 +525,111 @@ static void remove_connection(struct ceph_messenger *msgr, spin_unlock(&msgr->con_lock); } - -/* - * atomically queue work on a connection. bump reference to avoid - * races with connection teardown. - */ -void ceph_queue_con(struct ceph_connection *con) -{ - if (test_bit(WAIT, &con->state) || - test_bit(CLOSED, &con->state) || - test_bit(BACKOFF, &con->state)) { - dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n", - con); - return; - } - - atomic_inc(&con->nref); - dout(40, "ceph_queue_con %p %d -> %d\n", con, - atomic_read(&con->nref) - 1, atomic_read(&con->nref)); - - set_bit(QUEUED, &con->state); - if (test_bit(BUSY, &con->state) || - !queue_work(ceph_msgr_wq, &con->work.work)) { - dout(40, "ceph_queue_con %p - already BUSY or queued\n", con); - put_connection(con); - } -} - - /* - * failure case - * A retry mechanism is used with exponential backoff + * replace another connection + * (old and new should be for the _same_ peer, + * and thus in the same bucket in the radix tree) */ -static void ceph_fault(struct ceph_connection *con) +static void __replace_connection(struct ceph_messenger *msgr, + struct ceph_connection *old, + struct ceph_connection *new) { - derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name), - IPQUADPORT(con->peer_addr.ipaddr), con->error_msg); - dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n", - con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); + /* replace in con_tree */ + if (list_empty(&old->list_bucket)) { + /* oh, just replace old with new in bucket list */ + list_add(&new->list_bucket, &old->list_bucket); + list_del_init(&old->list_bucket); + } else { + unsigned long key = hash_addr(&new->peer_addr); + void **slot; - if (test_bit(LOSSYTX, &con->state)) { - dout(30, "fault on LOSSYTX channel\n"); - remove_connection(con->msgr, con); - return; + slot = radix_tree_lookup_slot(&msgr->con_tree, key); + BUG_ON(radix_tree_deref_slot(slot) != &old->list_bucket); + radix_tree_replace_slot(slot, &new->list_bucket); } - con_close_socket(con); - - /* hmm? */ - BUG_ON(test_bit(WAIT, &con->state)); + /* take old connections message queue */ + spin_lock(&old->out_queue_lock); + if (!list_empty(&old->out_queue)) + list_splice_init(&new->out_queue, &old->out_queue); + spin_unlock(&old->out_queue_lock); - /* - * If there are no messages in the queue, place the - * connection in a STANDBY state. otherwise, retry with - * delay - */ - spin_lock(&con->out_queue_lock); - if (list_empty(&con->out_queue)) { - dout(10, "fault setting STANDBY\n"); - set_bit(STANDBY, &con->state); - spin_unlock(&con->out_queue_lock); - return; - } + new->connect_seq = le32_to_cpu(new->in_connect.connect_seq); + new->out_seq = old->out_seq; - dout(10, "fault setting BACKOFF\n"); - set_bit(BACKOFF, &con->state); + set_bit(CLOSED, &old->state); + put_connection(old); /* dec reference count */ - if (con->delay == 0) - con->delay = BASE_DELAY_INTERVAL; - else if (con->delay < MAX_DELAY_INTERVAL) - con->delay *= 2; + clear_bit(ACCEPTING, &new->state); +} - atomic_inc(&con->nref); - dout(40, "fault queueing %p %d -> %d delay %lu\n", con, - atomic_read(&con->nref) - 1, atomic_read(&con->nref), - con->delay); - queue_delayed_work(ceph_msgr_wq, &con->work, - round_jiffies_relative(con->delay)); - list_splice_init(&con->out_sent, &con->out_queue); - spin_unlock(&con->out_queue_lock); -} -/* - * non-blocking versions - * - * these should be called while holding con->con_lock - */ /* - * write as much of con->out_partial to the socket as we can. - * 1 -> done - * 0 -> socket full, but more to do - * <0 -> error + * We maintain a global counter to order connection attempts. Get + * a unique seq greater than @gt. */ -static int write_partial_kvec(struct ceph_connection *con) +static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) { - int ret; + u32 ret; - dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes); - while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, - con->out_kvec_left, con->out_kvec_bytes, - con->out_more); - if (ret <= 0) - goto out; - con->out_kvec_bytes -= ret; - if (con->out_kvec_bytes == 0) - break; /* done */ - while (ret > 0) { - if (ret >= con->out_kvec_cur->iov_len) { - ret -= con->out_kvec_cur->iov_len; - con->out_kvec_cur++; - con->out_kvec_left--; - } else { - con->out_kvec_cur->iov_len -= ret; - con->out_kvec_cur->iov_base += ret; - ret = 0; - break; - } - } - } - con->out_kvec_left = 0; - ret = 1; -out: - dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, - con->out_kvec_left, con->out_kvec_bytes, ret); - return ret; /* done! */ + spin_lock(&msgr->global_seq_lock); + if (msgr->global_seq < gt) + msgr->global_seq = gt; + ret = ++msgr->global_seq; + spin_unlock(&msgr->global_seq_lock); + return ret; } -static int write_partial_msg_pages(struct ceph_connection *con, - struct ceph_msg *msg) -{ - int ret; - unsigned data_len = le32_to_cpu(msg->hdr.data_len); - struct ceph_client *client = con->msgr->parent; - int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC); - size_t len; - - dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n", - con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, - con->out_msg_pos.page_pos); - - while (con->out_msg_pos.page < con->out_msg->nr_pages) { - struct page *page = NULL; - void *kaddr = NULL; - - mutex_lock(&msg->page_mutex); - if (msg->pages) { - page = msg->pages[con->out_msg_pos.page]; - if (crc) - kaddr = kmap(page); - } else { - /*dout(60, "using zero page\n");*/ - if (crc) - kaddr = page_address(con->msgr->zero_page); - } - len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), - (int)(data_len - con->out_msg_pos.data_pos)); - if (crc && !con->out_msg_pos.did_page_crc) { - void *base = kaddr + con->out_msg_pos.page_pos; - - con->out_msg->footer.data_crc = - cpu_to_le32(crc32c_le(le32_to_cpu(con->out_msg->footer.data_crc), - base, len)); - con->out_msg_pos.did_page_crc = 1; - } - - if (msg->pages) - ret = kernel_sendpage(con->sock, page, - con->out_msg_pos.page_pos, len, - MSG_DONTWAIT | MSG_NOSIGNAL | - MSG_MORE); - else - ret = kernel_sendpage(con->sock, con->msgr->zero_page, - con->out_msg_pos.page_pos, len, - MSG_DONTWAIT | MSG_NOSIGNAL | - MSG_MORE); - - if (crc && msg->pages) - kunmap(page); - - mutex_unlock(&msg->page_mutex); - if (ret <= 0) - goto out; - con->out_msg_pos.data_pos += ret; - con->out_msg_pos.page_pos += ret; - if (ret == len) { - con->out_msg_pos.page_pos = 0; - con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = 0; - } - } - /* done with data pages */ - dout(30, "write_partial_msg_pages wrote all pages on %p\n", con); - - /* queue up footer, too */ - if (!crc) - con->out_msg->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_NOCRC); - con->out_kvec[0].iov_base = &con->out_msg->footer; - con->out_kvec_bytes = con->out_kvec[0].iov_len = - sizeof(con->out_msg->footer); - con->out_kvec_left = 1; - con->out_kvec_cur = con->out_kvec; - con->out_msg = NULL; - con->out_more = 0; /* end of message */ - ret = 1; -out: - return ret; +/* + * Prepare footer for currently outgoing message, and finish things + * off. Assumes out_kvec* are already valid.. we just add on to the end. + */ +static void prepare_write_message_footer(struct ceph_connection *con, int v) +{ + struct ceph_msg *m = con->out_msg; + + con->out_kvec[v].iov_base = &m->footer; + con->out_kvec[v].iov_len = sizeof(m->footer); + con->out_kvec_bytes += sizeof(m->footer); + con->out_kvec_left++; + con->out_msg = NULL; /* we're done with this one */ + con->out_more = 0; /* end of message */ } - /* - * build out_partial based on the next outgoing message in the queue. + * Prepare headers for the next outgoing message. */ static void prepare_write_message(struct ceph_connection *con) { struct ceph_msg *m; int v = 0; + con->out_kvec_bytes = 0; - /* ack? */ + /* Sneak an ack in there first? If we can get it into the same + * TCP packet that's a good thing. */ if (con->in_seq > con->in_seq_acked) { con->in_seq_acked = con->in_seq; con->out_kvec[v].iov_base = &tag_ack; con->out_kvec[v++].iov_len = 1; - con->out32 = cpu_to_le32(con->in_seq_acked); - con->out_kvec[v].iov_base = &con->out32; + con->out_temp_ack = cpu_to_le32(con->in_seq_acked); + con->out_kvec[v].iov_base = &con->out_temp_ack; con->out_kvec[v++].iov_len = 4; con->out_kvec_bytes = 1 + 4; } - /* move to sending/sent list */ + /* move message to sending/sent list */ m = list_entry(con->out_queue.next, struct ceph_msg, list_head); list_del_init(&m->list_head); list_add_tail(&m->list_head, &con->out_sent); - con->out_msg = m; /* we dont bother taking a reference here. */ + con->out_msg = m; /* we don't bother taking a reference here. */ - /* encode header */ dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n", m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type), le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len), @@ -808,27 +645,35 @@ static void prepare_write_message(struct ceph_connection *con) con->out_kvec_left = v; con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len; con->out_kvec_cur = con->out_kvec; - con->out_more = 1; /* data? */ - - /* pages */ - con->out_msg_pos.page = 0; - con->out_msg_pos.page_pos = le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK; - con->out_msg_pos.data_pos = 0; - con->out_msg_pos.did_page_crc = 0; /* fill in crc (except data pages), footer */ - con->out_msg->hdr.crc = cpu_to_le32(crc32c_le(0, (void *)&m->hdr, - sizeof(m->hdr) - sizeof(m->hdr.crc))); + con->out_msg->hdr.crc = + cpu_to_le32(crc32c_le(0, (void *)&m->hdr, + sizeof(m->hdr) - sizeof(m->hdr.crc))); con->out_msg->footer.flags = 0; - con->out_msg->footer.front_crc = cpu_to_le32(crc32c_le(0, m->front.iov_base, - m->front.iov_len)); + con->out_msg->footer.front_crc = + cpu_to_le32(crc32c_le(0, m->front.iov_base, m->front.iov_len)); con->out_msg->footer.data_crc = 0; + /* is there a data payload? */ + if (le32_to_cpu(m->hdr.data_len) == 0) { + /* initialize page iterator */ + con->out_msg_pos.page = 0; + con->out_msg_pos.page_pos = + le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK; + con->out_msg_pos.data_pos = 0; + con->out_msg_pos.did_page_crc = 0; + con->out_more = 1; /* data + footer will follow */ + } else { + /* no, queue up footer too and be done */ + prepare_write_message_footer(con, v); + } + set_bit(WRITE_PENDING, &con->state); } /* - * prepare an ack for send + * Prepare an ack. */ static void prepare_write_ack(struct ceph_connection *con) { @@ -838,8 +683,8 @@ static void prepare_write_ack(struct ceph_connection *con) con->out_kvec[0].iov_base = &tag_ack; con->out_kvec[0].iov_len = 1; - con->out32 = cpu_to_le32(con->in_seq_acked); - con->out_kvec[1].iov_base = &con->out32; + con->out_temp_ack = cpu_to_le32(con->in_seq_acked); + con->out_kvec[1].iov_base = &con->out_temp_ack; con->out_kvec[1].iov_len = 4; con->out_kvec_left = 2; con->out_kvec_bytes = 1 + 4; @@ -848,25 +693,18 @@ static void prepare_write_ack(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } -static void prepare_read_connect(struct ceph_connection *con) -{ - con->in_base_pos = 0; -} - -static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) -{ - u32 ret; - spin_lock(&msgr->global_seq_lock); - if (msgr->global_seq < gt) - msgr->global_seq = gt; - ret = ++msgr->global_seq; - spin_unlock(&msgr->global_seq_lock); - return ret; -} +/* + * Connection negotiation. + */ +/* + * We connected to a peer and are saying hello. + */ static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con) { + int len = strlen(CEPH_BANNER); + con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(con->global_seq); con->out_connect.flags = 0; @@ -874,14 +712,13 @@ static void prepare_write_connect(struct ceph_messenger *msgr, con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX; con->out_kvec[0].iov_base = CEPH_BANNER; - con->out_kvec[0].iov_len = strlen(CEPH_BANNER); + con->out_kvec[0].iov_len = len; con->out_kvec[1].iov_base = &msgr->inst.addr; con->out_kvec[1].iov_len = sizeof(msgr->inst.addr); con->out_kvec[2].iov_base = &con->out_connect; con->out_kvec[2].iov_len = sizeof(con->out_connect); con->out_kvec_left = 3; - con->out_kvec_bytes = strlen(CEPH_BANNER) + - sizeof(msgr->inst.addr) + + con->out_kvec_bytes = len + sizeof(msgr->inst.addr) + sizeof(con->out_connect); con->out_kvec_cur = con->out_kvec; con->out_more = 0; @@ -903,6 +740,9 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr, set_bit(WRITE_PENDING, &con->state); } +/* + * We accepted a connection and are saying hello. + */ static void prepare_write_accept_hello(struct ceph_messenger *msgr, struct ceph_connection *con) { @@ -919,7 +759,10 @@ static void prepare_write_accept_hello(struct ceph_messenger *msgr, set_bit(WRITE_PENDING, &con->state); } -static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) +/* + * Write a single protocol control message (byte). + */ +static void prepare_write_tag(struct ceph_connection *con, char *ptag) { con->out_kvec[0].iov_base = ptag; con->out_kvec[0].iov_len = 1; @@ -930,6 +773,9 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) set_bit(WRITE_PENDING, &con->state); } +/* + * Negotiation succeeded on an incoming connection, tell the peer. + */ static void prepare_write_accept_ready(struct ceph_connection *con) { con->out_connect.flags = 0; @@ -947,6 +793,10 @@ static void prepare_write_accept_ready(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } +/* + * The connecting peer needs to try again with a larger connect_seq or + * global_seq (as indicated by *ptag). + */ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag, __le32 *pseq) { @@ -960,384 +810,221 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag, con->out_more = 0; set_bit(WRITE_PENDING, &con->state); - /* we'll re-read the connect request, but not the hello */ + /* we'll re-read the connect request, sans the hello + addr */ con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr); } + + + /* - * worker function when socket is writeable + * write as much of pending kvecs to the socket as we can. + * 1 -> done + * 0 -> socket full, but more to do + * <0 -> error */ -static int try_write(struct ceph_connection *con) +static int write_partial_kvec(struct ceph_connection *con) { - struct ceph_messenger *msgr = con->msgr; - int ret = 1; - - dout(30, "try_write start %p state %lu nref %d\n", con, con->state, - atomic_read(&con->nref)); - -more: - dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes); + int ret; - /* initiate connect? */ - if (con->sock == NULL) { - if (test_and_clear_bit(STANDBY, &con->state)) - con->connect_seq++; - con->global_seq = get_global_seq(msgr, 0); - prepare_write_connect(msgr, con); - prepare_read_connect(con); - set_bit(CONNECTING, &con->state); - con->in_tag = CEPH_MSGR_TAG_READY; - dout(5, "try_write initiating connect on %p new state %lu\n", - con, con->state); - BUG_ON(con->sock); - con->sock = ceph_tcp_connect(con); - dout(10, "tcp_connect returned %p\n", con->sock); - if (IS_ERR(con->sock)) { - con->sock = NULL; - con->error_msg = "connect error"; - ret = -1; + dout(10, "write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); + while (con->out_kvec_bytes > 0) { + ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, + con->out_kvec_left, con->out_kvec_bytes, + con->out_more); + if (ret <= 0) goto out; + con->out_kvec_bytes -= ret; + if (con->out_kvec_bytes == 0) + break; /* done */ + while (ret > 0) { + if (ret >= con->out_kvec_cur->iov_len) { + ret -= con->out_kvec_cur->iov_len; + con->out_kvec_cur++; + con->out_kvec_left--; + } else { + con->out_kvec_cur->iov_len -= ret; + con->out_kvec_cur->iov_base += ret; + ret = 0; + break; + } } } + con->out_kvec_left = 0; + ret = 1; +out: + dout(30, "write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, + con->out_kvec_bytes, con->out_kvec_left, ret); + return ret; /* done! */ +} - /* kvec data queued? */ -more_kvec: - if (con->out_kvec_left) { - ret = write_partial_kvec(con); - if (ret == 0) - goto done; - if (ret < 0) { - dout(30, "try_write write_partial_kvec err %d\n", ret); - goto done; +/* + * Write as much message data payload as we can. If we finish, queue + * up the footer. + * 1 -> done, footer is now queued in out_kvec[]. + * 0 -> socket full, but more to do + * <0 -> error + */ +static int write_partial_msg_pages(struct ceph_connection *con) +{ + struct ceph_client *client = con->msgr->parent; + struct ceph_msg *msg = con->out_msg; + unsigned data_len = le32_to_cpu(msg->hdr.data_len); + size_t len; + int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC); + int ret; + + dout(30, "write_partial_msg_pages %p msg %p page %d/%d offset %d\n", + con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, + con->out_msg_pos.page_pos); + + while (con->out_msg_pos.page < con->out_msg->nr_pages) { + struct page *page = NULL; + void *kaddr = NULL; + + /* + * if we are calculating the data crc (the default), we need + * to map the page. if our pages[] has been revoked, use the + * zero page. + */ + mutex_lock(&msg->page_mutex); + if (msg->pages) { + page = msg->pages[con->out_msg_pos.page]; + if (crc) + kaddr = kmap(page); + } else { + page = con->msgr->zero_page; + if (crc) + kaddr = page_address(con->msgr->zero_page); } - } + len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), + (int)(data_len - con->out_msg_pos.data_pos)); + if (crc && !con->out_msg_pos.did_page_crc) { + void *base = kaddr + con->out_msg_pos.page_pos; + u32 crc = le32_to_cpu(con->out_msg->footer.data_crc); - /* msg pages? */ - if (con->out_msg) { - ret = write_partial_msg_pages(con, con->out_msg); - if (ret == 1) - goto more_kvec; - if (ret == 0) - goto done; - if (ret < 0) { - dout(30, "try_write write_partial_msg_pages err %d\n", - ret); - goto done; + con->out_msg->footer.data_crc = + cpu_to_le32(crc32c_le(crc, base, len)); + con->out_msg_pos.did_page_crc = 1; } - } - con->out_msg = NULL; /* done with this message. */ - /* anything else pending? */ - spin_lock(&con->out_queue_lock); - if (!list_empty(&con->out_queue)) { - prepare_write_message(con); - } else if (con->in_seq > con->in_seq_acked) { - prepare_write_ack(con); - } else { - clear_bit(WRITE_PENDING, &con->state); - /* hmm, nothing to do! No more writes pending? */ - dout(30, "try_write nothing else to write.\n"); - spin_unlock(&con->out_queue_lock); - goto done; + ret = kernel_sendpage(con->sock, page, + con->out_msg_pos.page_pos, len, + MSG_DONTWAIT | MSG_NOSIGNAL | + MSG_MORE); + + if (crc && msg->pages) + kunmap(page); + + mutex_unlock(&msg->page_mutex); + if (ret <= 0) + goto out; + + con->out_msg_pos.data_pos += ret; + con->out_msg_pos.page_pos += ret; + if (ret == len) { + con->out_msg_pos.page_pos = 0; + con->out_msg_pos.page++; + con->out_msg_pos.did_page_crc = 0; + } } - spin_unlock(&con->out_queue_lock); - goto more; -done: - ret = 0; + dout(30, "write_partial_msg_pages %p msg %p done\n", con, msg); + + /* prepare and queue up footer, too */ + if (!crc) + con->out_msg->footer.flags |= + cpu_to_le32(CEPH_MSG_FOOTER_NOCRC); + con->out_kvec_bytes = 0; + con->out_kvec_left = 0; + con->out_kvec_cur = con->out_kvec; + prepare_write_message_footer(con, 0); + ret = 1; out: - dout(30, "try_write done on %p\n", con); return ret; } + + +/* + * Prepare to read connection handshake, or an ack. + */ +static void prepare_read_connect(struct ceph_connection *con) +{ + con->in_base_pos = 0; +} + +static void prepare_read_ack(struct ceph_connection *con) +{ + con->in_base_pos = 0; +} + /* - * prepare to read a message + * Prepare to read a message. */ static int prepare_read_message(struct ceph_connection *con) { int err; + BUG_ON(con->in_msg != NULL); con->in_base_pos = 0; con->in_msg = ceph_msg_new(0, 0, 0, 0, NULL); if (IS_ERR(con->in_msg)) { - /* TBD: we don't check for error in caller, handle it here? */ err = PTR_ERR(con->in_msg); con->in_msg = NULL; - derr(1, "kmalloc failure on incoming message %d\n", err); + con->error_msg = "out of memory for incoming message"; return err; } con->in_front_crc = con->in_data_crc = 0; return 0; } + + /* - * read (part of) a message + * Read all or part of the connect-side handshake on a new connection */ -static int read_message_partial(struct ceph_connection *con) +static int read_partial_connect(struct ceph_connection *con) { - struct ceph_msg *m = con->in_msg; - void *p; - int ret; - int to, want, left; - unsigned front_len, data_len, data_off; + int ret, to; + dout(20, "read_partial_connect %p at %d\n", con, con->in_base_pos); - dout(20, "read_message_partial con %p msg %p\n", con, m); + /* peer's banner */ + to = strlen(CEPH_BANNER); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = con->in_base_pos; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_banner + have, + left); + if (ret <= 0) + goto out; + con->in_base_pos += ret; + } - /* header */ - while (con->in_base_pos < sizeof(m->hdr)) { - left = sizeof(m->hdr) - con->in_base_pos; + /* peer's addr */ + to += sizeof(con->actual_peer_addr); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(con->actual_peer_addr) - left; ret = ceph_tcp_recvmsg(con->sock, - (char *)&m->hdr + con->in_base_pos, + (char *)&con->actual_peer_addr + have, left); if (ret <= 0) - return ret; + goto out; con->in_base_pos += ret; - if (con->in_base_pos == sizeof(m->hdr)) { - u32 crc = crc32c_le(0, (void *)&m->hdr, - sizeof(m->hdr) - sizeof(m->hdr.crc)); - if (crc != le32_to_cpu(m->hdr.crc)) { - derr(0, "read_message_partial %p bad hdr crc" - " %u != expected %u\n", - m, crc, m->hdr.crc); - return -EIO; - } - } } - /* front */ - front_len = le32_to_cpu(m->hdr.front_len); - while (m->front.iov_len < front_len) { - if (m->front.iov_base == NULL) { - m->front.iov_base = kmalloc(front_len, GFP_NOFS); - if (m->front.iov_base == NULL) - return -ENOMEM; - } - left = front_len - m->front.iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + - m->front.iov_len, left); + /* in_tag */ + to += 1; + if (con->in_base_pos < to) { + ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); if (ret <= 0) - return ret; - m->front.iov_len += ret; - if (m->front.iov_len == front_len) - con->in_front_crc = crc32c_le(0, m->front.iov_base, - m->front.iov_len); - } - - /* (page) data */ - data_len = le32_to_cpu(m->hdr.data_len); - data_off = le32_to_cpu(m->hdr.data_off); - if (data_len == 0) - goto no_data; - if (m->nr_pages == 0) { - con->in_msg_pos.page = 0; - con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; - con->in_msg_pos.data_pos = 0; - /* find pages for data payload */ - want = calc_pages_for(data_off & ~PAGE_MASK, data_len); - ret = 0; - BUG_ON(!con->msgr->prepare_pages); - ret = con->msgr->prepare_pages(con->msgr->parent, m, want); - if (ret < 0) { - dout(10, "prepare_pages failed, skipping payload\n"); - con->in_base_pos = -data_len - sizeof(m->footer); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - return 0; - } else { - BUG_ON(m->nr_pages < want); - } - } - while (con->in_msg_pos.data_pos < data_len) { - left = min((int)(data_len - con->in_msg_pos.data_pos), - (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); - mutex_lock(&m->page_mutex); - if (!m->pages) { - dout(10, "pages revoked during msg read\n"); - mutex_unlock(&m->page_mutex); - con->in_base_pos = con->in_msg_pos.data_pos - data_len - - sizeof(m->footer); - ceph_msg_put(m); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; - return 0; - } - p = kmap(m->pages[con->in_msg_pos.page]); - ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, - left); - if (ret > 0) - con->in_data_crc = - crc32c_le(con->in_data_crc, - p + con->in_msg_pos.page_pos, ret); - kunmap(m->pages[con->in_msg_pos.page]); - mutex_unlock(&m->page_mutex); - if (ret <= 0) - return ret; - con->in_msg_pos.data_pos += ret; - con->in_msg_pos.page_pos += ret; - if (con->in_msg_pos.page_pos == PAGE_SIZE) { - con->in_msg_pos.page_pos = 0; - con->in_msg_pos.page++; - } - } - -no_data: - /* footer */ - to = sizeof(m->hdr) + sizeof(m->footer); - while (con->in_base_pos < to) { - left = to - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + - (con->in_base_pos - sizeof(m->hdr)), - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } - dout(20, "read_message_partial got msg %p\n", m); - - /* crc ok? */ - if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { - derr(0, "read_message_partial %p front crc %u != expected %u\n", - con->in_msg, - con->in_front_crc, m->footer.front_crc); - return -EIO; - } - if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { - derr(0, "read_message_partial %p data crc %u != expected %u\n", - con->in_msg, - con->in_data_crc, m->footer.data_crc); - return -EIO; - } - - /* did i learn my ip? */ - if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - /* - * in practice, we learn our ip from the first incoming mon - * message, before anyone else knows we exist, so this is - * safe. - */ - con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr; - dout(10, "read_message_partial learned my addr is " - "%u.%u.%u.%u:%u\n", - IPQUADPORT(con->msgr->inst.addr.ipaddr)); - } - - return 1; /* done! */ -} - -static void process_message(struct ceph_connection *con) -{ - /* if first message, set peer_name */ - if (con->peer_name.type == 0) - con->peer_name = con->in_msg->hdr.src.name; - - spin_lock(&con->out_queue_lock); - con->in_seq++; - spin_unlock(&con->out_queue_lock); - - dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n", - con->in_msg, le64_to_cpu(con->in_msg->hdr.seq), - ENTITY_NAME(con->in_msg->hdr.src.name), - le16_to_cpu(con->in_msg->hdr.type), - ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)), - le32_to_cpu(con->in_msg->hdr.front_len), - le32_to_cpu(con->in_msg->hdr.data_len), - con->in_front_crc, con->in_data_crc); - con->msgr->dispatch(con->msgr->parent, con->in_msg); - con->in_msg = NULL; - con->in_tag = CEPH_MSGR_TAG_READY; -} - -/* - * prepare to read an ack - */ -static void prepare_read_ack(struct ceph_connection *con) -{ - con->in_base_pos = 0; -} - -/* - * read (part of) an ack - */ -static int read_ack_partial(struct ceph_connection *con) -{ - while (con->in_base_pos < sizeof(con->in_partial_ack)) { - int left = sizeof(con->in_partial_ack) - con->in_base_pos; - int ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_partial_ack + - con->in_base_pos, left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } - return 1; /* done */ -} - -static void process_ack(struct ceph_connection *con) -{ - struct ceph_msg *m; - u32 ack = le32_to_cpu(con->in_partial_ack); - u64 seq; - - spin_lock(&con->out_queue_lock); - while (!list_empty(&con->out_sent)) { - m = list_entry(con->out_sent.next, struct ceph_msg, list_head); - seq = le64_to_cpu(m->hdr.seq); - if (seq > ack) - break; - dout(5, "got ack for seq %llu type %d at %p\n", seq, - le16_to_cpu(m->hdr.type), m); - list_del_init(&m->list_head); - ceph_msg_put(m); - } - spin_unlock(&con->out_queue_lock); - con->in_tag = CEPH_MSGR_TAG_READY; -} - - -/* - * read portion of connect-side handshake on a new connection - */ -static int read_connect_partial(struct ceph_connection *con) -{ - int ret, to; - dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos); - - /* peer's banner */ - to = strlen(CEPH_BANNER); - while (con->in_base_pos < to) { - int left = to - con->in_base_pos; - int have = con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_banner + have, - left); - if (ret <= 0) - goto out; - con->in_base_pos += ret; - } - - /* peer's addr */ - to += sizeof(con->actual_peer_addr); - while (con->in_base_pos < to) { - int left = to - con->in_base_pos; - int have = sizeof(con->actual_peer_addr) - left; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->actual_peer_addr + have, - left); - if (ret <= 0) - goto out; - con->in_base_pos += ret; - } - - /* in_tag */ - to += 1; - if (con->in_base_pos < to) { - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); - if (ret <= 0) - goto out; - con->in_base_pos += ret; + goto out; + con->in_base_pos += ret; } + /* TAG_READY is followed by a u8 flags */ if (con->in_tag == CEPH_MSGR_TAG_READY) { to++; if (con->in_base_pos < to) { @@ -1349,6 +1036,7 @@ static int read_connect_partial(struct ceph_connection *con) } } + /* TAG_RETRY_SESSION is followed by a __le32 connect_seq */ if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) { /* peer's connect_seq */ to += sizeof(con->in_connect.connect_seq); @@ -1363,6 +1051,8 @@ static int read_connect_partial(struct ceph_connection *con) con->in_base_pos += ret; } } + + /* TAG_RETRY_SESSION is followed by a __le32 global_seq */ if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { /* peer's global_seq */ to += sizeof(con->in_connect.global_seq); @@ -1377,18 +1067,31 @@ static int read_connect_partial(struct ceph_connection *con) con->in_base_pos += ret; } } - ret = 1; -out: - dout(20, "read_connect_partial %p end at %d ret %d\n", con, - con->in_base_pos, ret); - dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n", - le32_to_cpu(con->in_connect.connect_seq), + ret = 1; /* done */ + dout(20, "read_partial_connect %p connect_seq = %u, global_seq = %u\n", + con, le32_to_cpu(con->in_connect.connect_seq), le32_to_cpu(con->in_connect.global_seq)); - return ret; /* done */ +out: + return ret; +} + +/* + * Verify the hello banner looks okay. + */ +static int verify_hello(struct ceph_connection *con) +{ + if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + derr(10, "connection to/from %u.%u.%u.%u:%u has bad banner\n", + IPQUADPORT(con->peer_addr.ipaddr)); + con->error_msg = "protocol error, bad banner"; + return -1; + } + return 0; } /* - * Reset a connection + * Reset a connection. Discard all incoming and outgoing messages + * and clear *_seq state. */ static void reset_connection(struct ceph_connection *con) { @@ -1410,16 +1113,6 @@ static void reset_connection(struct ceph_connection *con) spin_unlock(&con->out_queue_lock); } -static int verify_hello(struct ceph_connection *con) -{ - if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n", - IPQUADPORT(con->peer_addr.ipaddr)); - con->error_msg = "protocol error, bad banner"; - return -1; - } - return 0; -} static int process_connect(struct ceph_connection *con) { @@ -1428,7 +1121,11 @@ static int process_connect(struct ceph_connection *con) if (verify_hello(con) < 0) return -1; - /* verify peer addr */ + /* + * Make sure the other end is who we wanted. note that the other + * end may not yet know their ip address, so if it's 0.0.0.0, give + * them the benefit of the doubt. + */ if (!ceph_entity_addr_is_local(&con->peer_addr, &con->actual_peer_addr) && con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) { @@ -1444,15 +1141,27 @@ static int process_connect(struct ceph_connection *con) switch (con->in_tag) { case CEPH_MSGR_TAG_RESETSESSION: + /* + * If we connected with a large connect_seq but the peer + * has no record of a session with us (no connection, or + * connect_seq == 0), they will send RESETSESION to indicate + * that they must have reset their session, and may have + * dropped messages. + */ dout(10, "process_connect got RESET peer seq %u\n", le32_to_cpu(con->in_connect.connect_seq)); reset_connection(con); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); + /* Tell ceph about it. */ con->msgr->peer_reset(con->msgr->parent, &con->peer_addr, &con->peer_name); break; case CEPH_MSGR_TAG_RETRY_SESSION: + /* + * If we sent a smaller connect_seq than the peer has, try + * again with a larger value. + */ dout(10, "process_connect got RETRY my seq = %u, peer_seq = %u\n", le32_to_cpu(con->out_connect.connect_seq), @@ -1462,6 +1171,10 @@ static int process_connect(struct ceph_connection *con) prepare_read_connect(con); break; case CEPH_MSGR_TAG_RETRY_GLOBAL: + /* + * If we sent a smaller global_seq than the peer has, try + * again with a larger value. + */ dout(10, "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n", con->global_seq, le32_to_cpu(con->in_connect.global_seq)); @@ -1472,6 +1185,12 @@ static int process_connect(struct ceph_connection *con) prepare_read_connect(con); break; case CEPH_MSGR_TAG_WAIT: + /* + * If there is a connection race (we are opening connections to + * each other), one of us may just have to WAIT. We will keep + * our queued messages, in expectation of being replaced by an + * incoming connection. + */ dout(10, "process_connect peer connecting WAIT\n"); set_bit(WAIT, &con->state); con_close_socket(con); @@ -1479,8 +1198,9 @@ static int process_connect(struct ceph_connection *con) case CEPH_MSGR_TAG_READY: dout(10, "process_connect got READY, now open\n"); clear_bit(CONNECTING, &con->state); - con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX; - con->delay = 0; /* reset backoffmemory */ + if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX) + set_bit(LOSSYRX, &con->state); + con->delay = 0; /* reset backoff memory */ break; default: derr(1, "process_connect protocol error, will retry\n"); @@ -1492,9 +1212,10 @@ static int process_connect(struct ceph_connection *con) /* - * read portion of accept-side handshake on a newly accepted connection + * Read all or part of the accept-side handshake on a newly accepted + * connection. */ -static int read_accept_partial(struct ceph_connection *con) +static int read_partial_accept(struct ceph_connection *con) { int ret; int to; @@ -1536,145 +1257,445 @@ static int read_accept_partial(struct ceph_connection *con) con->in_base_pos += ret; } + return 1; /* done */ +} + +/* + * Call after a new connection's handshake has been read. + */ +static int process_accept(struct ceph_connection *con) +{ + struct ceph_connection *existing; + struct ceph_messenger *msgr = con->msgr; + u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq); + u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq); + + if (verify_hello(con) < 0) + return -1; + + /* note flags */ + if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX) + set_bit(LOSSYRX, &con->state); + + /* do we have an existing connection for this peer? */ + if (radix_tree_preload(GFP_NOFS) < 0) { + derr(10, "ENOMEM in process_accept\n"); + con->error_msg = "out of memory"; + return -1; + } + spin_lock(&msgr->con_lock); + existing = __get_connection(msgr, &con->peer_addr); + if (existing) { + if (peer_gseq < existing->global_seq) { + /* retry_global */ + con->global_seq = existing->global_seq; + con->out_connect.global_seq = + cpu_to_le32(con->global_seq); + prepare_write_accept_retry(con, + &tag_retry_global, + &con->out_connect.global_seq); + } else if (test_bit(LOSSYTX, &existing->state)) { + dout(20, "process_accept %p replacing LOSSYTX %p\n", + con, existing); + reset_connection(existing); + __replace_connection(msgr, existing, con); + prepare_write_accept_ready(con); + } else if (peer_cseq < existing->connect_seq) { + if (peer_cseq == 0) { + reset_connection(existing); + __replace_connection(msgr, existing, con); + prepare_write_accept_ready(con); + con->msgr->peer_reset(con->msgr->parent, + &con->peer_addr, + &con->peer_name); + } else { + /* old attempt or peer didn't get the READY */ + /* send retry with peers connect seq */ + con->connect_seq = existing->connect_seq; + con->out_connect.connect_seq = + cpu_to_le32(con->connect_seq); + prepare_write_accept_retry(con, + &tag_retry_session, + &con->out_connect.connect_seq); + } + } else if (peer_cseq == existing->connect_seq && + (test_bit(CONNECTING, &existing->state) || + test_bit(STANDBY, &existing->state) || + test_bit(WAIT, &existing->state))) { + /* connection race */ + dout(20, "process_accept connection race state = %lu\n", + con->state); + if (ceph_entity_addr_equal(&msgr->inst.addr, + &con->peer_addr)) { + /* incoming connection wins.. */ + /* replace existing with new connection */ + __replace_connection(msgr, existing, con); + prepare_write_accept_ready(con); + } else { + /* our existing outgoing connection wins.. + tell peer to wait for our outgoing + connection to go through */ + prepare_write_tag(con, &tag_wait); + } + } else if (existing->connect_seq == 0 && + peer_cseq > existing->connect_seq) { + /* we reset and already reconnecting */ + prepare_write_tag(con, &tag_reset); + } else { + /* reconnect case, replace connection */ + __replace_connection(msgr, existing, con); + prepare_write_accept_ready(con); + } + put_connection(existing); + } else if (peer_cseq > 0) { + dout(20, "process_accept no existing connection, we reset\n"); + prepare_write_tag(con, &tag_reset); + } else { + dout(20, "process_accept no existing connection, opening\n"); + __register_connection(msgr, con); + con->global_seq = peer_gseq; + con->connect_seq = peer_cseq + 1; + prepare_write_accept_ready(con); + } + spin_unlock(&msgr->con_lock); + radix_tree_preload_end(); + + ceph_queue_con(con); + put_connection(con); + return 0; +} + +/* + * read (part of) an ack + */ +static int read_partial_ack(struct ceph_connection *con) +{ + while (con->in_base_pos < sizeof(con->in_temp_ack)) { + int left = sizeof(con->in_temp_ack) - con->in_base_pos; + int ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_temp_ack + + con->in_base_pos, left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; + } + return 1; /* done */ +} + +/* + * We can finally discard anything that's been acked. + */ +static void process_ack(struct ceph_connection *con) +{ + struct ceph_msg *m; + u32 ack = le32_to_cpu(con->in_temp_ack); + u64 seq; + + spin_lock(&con->out_queue_lock); + while (!list_empty(&con->out_sent)) { + m = list_entry(con->out_sent.next, struct ceph_msg, list_head); + seq = le64_to_cpu(m->hdr.seq); + if (seq > ack) + break; + dout(5, "got ack for seq %llu type %d at %p\n", seq, + le16_to_cpu(m->hdr.type), m); + list_del_init(&m->list_head); + ceph_msg_put(m); + } + spin_unlock(&con->out_queue_lock); + con->in_tag = CEPH_MSGR_TAG_READY; +} + + + + + + +/* + * read (part of) a message. + */ +static int read_partial_message(struct ceph_connection *con) +{ + struct ceph_msg *m = con->in_msg; + void *p; + int ret; + int to, want, left; + unsigned front_len, data_len, data_off; + + dout(20, "read_partial_message con %p msg %p\n", con, m); + + /* header */ + while (con->in_base_pos < sizeof(m->hdr)) { + left = sizeof(m->hdr) - con->in_base_pos; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&m->hdr + con->in_base_pos, + left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; + if (con->in_base_pos == sizeof(m->hdr)) { + u32 crc = crc32c_le(0, (void *)&m->hdr, + sizeof(m->hdr) - sizeof(m->hdr.crc)); + if (crc != le32_to_cpu(m->hdr.crc)) { + derr(0, "read_partial_message %p bad hdr crc" + " %u != expected %u\n", + m, crc, m->hdr.crc); + return -EIO; + } + } + } + + /* front */ + front_len = le32_to_cpu(m->hdr.front_len); + while (m->front.iov_len < front_len) { + if (m->front.iov_base == NULL) { + m->front.iov_base = kmalloc(front_len, GFP_NOFS); + if (m->front.iov_base == NULL) + return -ENOMEM; + } + left = front_len - m->front.iov_len; + ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base + + m->front.iov_len, left); + if (ret <= 0) + return ret; + m->front.iov_len += ret; + if (m->front.iov_len == front_len) + con->in_front_crc = crc32c_le(0, m->front.iov_base, + m->front.iov_len); + } + + /* (page) data */ + data_len = le32_to_cpu(m->hdr.data_len); + data_off = le32_to_cpu(m->hdr.data_off); + if (data_len == 0) + goto no_data; + if (m->nr_pages == 0) { + con->in_msg_pos.page = 0; + con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; + con->in_msg_pos.data_pos = 0; + /* find pages for data payload */ + want = calc_pages_for(data_off & ~PAGE_MASK, data_len); + ret = 0; + BUG_ON(!con->msgr->prepare_pages); + ret = con->msgr->prepare_pages(con->msgr->parent, m, want); + if (ret < 0) { + dout(10, "prepare_pages failed, skipping payload\n"); + con->in_base_pos = -data_len - sizeof(m->footer); + ceph_msg_put(con->in_msg); + con->in_msg = NULL; + con->in_tag = CEPH_MSGR_TAG_READY; + return 0; + } + BUG_ON(m->nr_pages < want); + } + while (con->in_msg_pos.data_pos < data_len) { + left = min((int)(data_len - con->in_msg_pos.data_pos), + (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); + mutex_lock(&m->page_mutex); + if (!m->pages) { + dout(10, "pages revoked during msg read\n"); + mutex_unlock(&m->page_mutex); + con->in_base_pos = con->in_msg_pos.data_pos - data_len - + sizeof(m->footer); + ceph_msg_put(m); + con->in_msg = NULL; + con->in_tag = CEPH_MSGR_TAG_READY; + return 0; + } + p = kmap(m->pages[con->in_msg_pos.page]); + ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, + left); + if (ret > 0) + con->in_data_crc = + crc32c_le(con->in_data_crc, + p + con->in_msg_pos.page_pos, ret); + kunmap(m->pages[con->in_msg_pos.page]); + mutex_unlock(&m->page_mutex); + if (ret <= 0) + return ret; + con->in_msg_pos.data_pos += ret; + con->in_msg_pos.page_pos += ret; + if (con->in_msg_pos.page_pos == PAGE_SIZE) { + con->in_msg_pos.page_pos = 0; + con->in_msg_pos.page++; + } + } + +no_data: + /* footer */ + to = sizeof(m->hdr) + sizeof(m->footer); + while (con->in_base_pos < to) { + left = to - con->in_base_pos; + ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + + (con->in_base_pos - sizeof(m->hdr)), + left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; + } + dout(20, "read_partial_message got msg %p\n", m); + + /* crc ok? */ + if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { + derr(0, "read_partial_message %p front crc %u != expected %u\n", + con->in_msg, + con->in_front_crc, m->footer.front_crc); + return -EIO; + } + if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { + derr(0, "read_partial_message %p data crc %u != expected %u\n", + con->in_msg, + con->in_data_crc, m->footer.data_crc); + return -EIO; + } + + /* did i learn my ip? */ + if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { + /* + * in practice, we learn our ip from the first incoming mon + * message, before anyone else knows we exist, so this is + * safe. + */ + con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr; + dout(10, "read_partial_message learned my addr is " + "%u.%u.%u.%u:%u\n", + IPQUADPORT(con->msgr->inst.addr.ipaddr)); + } + + return 1; /* done! */ +} + +/* + * Process message. This happens in the worker thread. The callback should + * be careful not to do anything that waits on other incoming messages or it + * may deadlock. + */ +static void process_message(struct ceph_connection *con) +{ + /* if first message, set peer_name */ + if (con->peer_name.type == 0) + con->peer_name = con->in_msg->hdr.src.name; + + spin_lock(&con->out_queue_lock); + con->in_seq++; + spin_unlock(&con->out_queue_lock); + + dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n", + con->in_msg, le64_to_cpu(con->in_msg->hdr.seq), + ENTITY_NAME(con->in_msg->hdr.src.name), + le16_to_cpu(con->in_msg->hdr.type), + ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)), + le32_to_cpu(con->in_msg->hdr.front_len), + le32_to_cpu(con->in_msg->hdr.data_len), + con->in_front_crc, con->in_data_crc); + con->msgr->dispatch(con->msgr->parent, con->in_msg); + con->in_msg = NULL; + con->in_tag = CEPH_MSGR_TAG_READY; +} + - return 1; /* done */ -} -/* - * replace another connection - * (old and new should be for the _same_ peer, - * and thus in the same pos in the radix tree) - */ -static void __replace_connection(struct ceph_messenger *msgr, - struct ceph_connection *old, - struct ceph_connection *new) -{ - /* take old connections message queue */ - spin_lock(&old->out_queue_lock); - if (!list_empty(&old->out_queue)) - list_splice_init(&new->out_queue, &old->out_queue); - spin_unlock(&old->out_queue_lock); - new->connect_seq = le32_to_cpu(new->in_connect.connect_seq); - new->out_seq = old->out_seq; - /* replace list entry */ - list_add(&new->list_bucket, &old->list_bucket); - list_del_init(&old->list_bucket); - set_bit(CLOSED, &old->state); - put_connection(old); /* dec reference count */ - clear_bit(ACCEPTING, &new->state); - prepare_write_accept_ready(new); -} /* - * call after a new connection's handshake has completed + * Write something to the socket. Called in a worker thread when the + * socket appears to be writeable and we have something ready to send. */ -static int process_accept(struct ceph_connection *con) +static int try_write(struct ceph_connection *con) { - struct ceph_connection *existing; struct ceph_messenger *msgr = con->msgr; - u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq); - u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq); + int ret = 1; - if (verify_hello(con) < 0) - return -1; + dout(30, "try_write start %p state %lu nref %d\n", con, con->state, + atomic_read(&con->nref)); - /* note flags */ - con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX; +more: + dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes); - /* connect */ - /* do we have an existing connection for this peer? */ - if (radix_tree_preload(GFP_NOFS) < 0) { - derr(10, "ENOMEM in process_accept\n"); - con->error_msg = "out of memory"; - return -1; + /* open the socket first? */ + if (con->sock == NULL) { + /* + * if we were STANDBY and are reconnecting _this_ + * connection, bump connect_seq now. Always bump + * global_seq. + */ + if (test_and_clear_bit(STANDBY, &con->state)) + con->connect_seq++; + con->global_seq = get_global_seq(msgr, 0); + + prepare_write_connect(msgr, con); + prepare_read_connect(con); + set_bit(CONNECTING, &con->state); + + con->in_tag = CEPH_MSGR_TAG_READY; + dout(5, "try_write initiating connect on %p new state %lu\n", + con, con->state); + con->sock = ceph_tcp_connect(con); + if (IS_ERR(con->sock)) { + con->sock = NULL; + con->error_msg = "connect error"; + ret = -1; + goto out; + } } - spin_lock(&msgr->con_lock); - existing = __get_connection(msgr, &con->peer_addr); - if (existing) { - if (peer_gseq < existing->global_seq) { - /* retry_global */ - con->global_seq = existing->global_seq; - con->out_connect.global_seq = - cpu_to_le32(con->global_seq); - prepare_write_accept_retry(con, - &tag_retry_global, - &con->out_connect.global_seq); - } else if (test_bit(LOSSYTX, &existing->state)) { - dout(20, "process_accept replacing existing LOSSYTX %p\n", - existing); - reset_connection(existing); - __replace_connection(msgr, existing, con); - } else if (peer_cseq < existing->connect_seq) { - if (peer_cseq == 0) { - /* reset existing connection */ - reset_connection(existing); - /* replace connection */ - __replace_connection(msgr, existing, con); - con->msgr->peer_reset(con->msgr->parent, - &con->peer_addr, - &con->peer_name); - } else { - /* old attempt or peer didn't get the READY */ - /* send retry with peers connect seq */ - con->connect_seq = existing->connect_seq; - con->out_connect.connect_seq = - cpu_to_le32(con->connect_seq); - prepare_write_accept_retry(con, - &tag_retry_session, - &con->out_connect.connect_seq); - } - } else if (peer_cseq == existing->connect_seq && - (test_bit(CONNECTING, &existing->state) || - test_bit(STANDBY, &existing->state) || - test_bit(WAIT, &existing->state))) { - /* connection race */ - dout(20, "process_accept connection race state = %lu\n", - con->state); - if (ceph_entity_addr_equal(&msgr->inst.addr, - &con->peer_addr)) { - /* incoming connection wins.. */ - /* replace existing with new connection */ - __replace_connection(msgr, existing, con); - } else { - /* our existing outgoing connection wins.. - tell peer to wait for our outgoing - connection to go through */ - prepare_write_accept_reply(con, &tag_wait); - } - } else if (existing->connect_seq == 0 && - peer_cseq > existing->connect_seq) { - /* we reset and already reconnecting */ - prepare_write_accept_reply(con, &tag_reset); - } else { - /* reconnect case, replace connection */ - __replace_connection(msgr, existing, con); + +more_kvec: + /* kvec data queued? */ + if (con->out_kvec_left) { + ret = write_partial_kvec(con); + if (ret <= 0) + goto done; + if (ret < 0) { + dout(30, "try_write write_partial_kvec err %d\n", ret); + goto done; } - put_connection(existing); - } else if (peer_cseq > 0) { - dout(20, "process_accept no existing connection, we reset\n"); - prepare_write_accept_reply(con, &tag_reset); - } else { - dout(20, "process_accept no existing connection, opening\n"); - __register_connection(msgr, con); - con->global_seq = peer_gseq; - con->connect_seq = peer_cseq + 1; - prepare_write_accept_ready(con); } - spin_unlock(&msgr->con_lock); - radix_tree_preload_end(); - ceph_queue_con(con); - put_connection(con); - return 0; + /* msg pages? */ + if (con->out_msg) { + ret = write_partial_msg_pages(con); + if (ret == 1) + goto more_kvec; /* we need to send the footer, too! */ + if (ret == 0) + goto done; + if (ret < 0) { + dout(30, "try_write write_partial_msg_pages err %d\n", + ret); + goto done; + } + } + + /* is anything else pending? */ + spin_lock(&con->out_queue_lock); + if (!list_empty(&con->out_queue)) { + prepare_write_message(con); + spin_unlock(&con->out_queue_lock); + goto more; + } + if (con->in_seq > con->in_seq_acked) { + prepare_write_ack(con); + spin_unlock(&con->out_queue_lock); + goto more; + } + + /* Nothing to do! */ + clear_bit(WRITE_PENDING, &con->state); + dout(30, "try_write nothing else to write.\n"); + spin_unlock(&con->out_queue_lock); +done: + ret = 0; +out: + dout(30, "try_write done on %p\n", con); + return ret; } + /* - * worker function when data is available on the socket + * Read what we can from the socket. */ static int try_read(struct ceph_connection *con) { @@ -1690,7 +1711,7 @@ static int try_read(struct ceph_connection *con) more: if (test_bit(ACCEPTING, &con->state)) { dout(20, "try_read accepting\n"); - ret = read_accept_partial(con); + ret = read_partial_accept(con); if (ret <= 0) goto done; if (process_accept(con) < 0) { @@ -1701,7 +1722,7 @@ more: } if (test_bit(CONNECTING, &con->state)) { dout(20, "try_read connecting\n"); - ret = read_connect_partial(con); + ret = read_partial_connect(con); if (ret <= 0) goto done; if (process_connect(con) < 0) { @@ -1712,7 +1733,11 @@ more: } if (con->in_base_pos < 0) { - /* skipping + discarding content */ + /* + * skipping + discarding content. + * + * FIXME: there must be a better way to do this! + */ static char buf[1024]; int skip = min(1024, -con->in_base_pos); dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos); @@ -1724,6 +1749,9 @@ more: goto more; } if (con->in_tag == CEPH_MSGR_TAG_READY) { + /* + * what's next? + */ ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); if (ret <= 0) goto done; @@ -1743,7 +1771,7 @@ more: } } if (con->in_tag == CEPH_MSGR_TAG_MSG) { - ret = read_message_partial(con); + ret = read_partial_message(con); if (ret == -EIO) { con->error_msg = "bad crc"; goto out; @@ -1756,7 +1784,7 @@ more: goto more; } if (con->in_tag == CEPH_MSGR_TAG_ACK) { - ret = read_ack_partial(con); + ret = read_partial_ack(con); if (ret <= 0) goto done; process_ack(con); @@ -1777,6 +1805,39 @@ bad_tag: } +/* + * Atomically queue work on a connection. Bump @con reference to + * avoid races with connection teardown. + * + * There is some trickery going on with QUEUED, BUSY, and BACKOFF: + * + * + */ +static void ceph_queue_con(struct ceph_connection *con) +{ + if (test_bit(WAIT, &con->state) || + test_bit(CLOSED, &con->state) || + test_bit(BACKOFF, &con->state)) { + dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n", + con); + return; + } + + atomic_inc(&con->nref); + dout(40, "ceph_queue_con %p %d -> %d\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref)); + + set_bit(QUEUED, &con->state); + if (test_bit(BUSY, &con->state) || + !queue_work(ceph_msgr_wq, &con->work.work)) { + dout(40, "ceph_queue_con %p - already BUSY or queued\n", con); + put_connection(con); + } +} + +/* + * Do some work on a connection. Drop a connection ref when we're done. + */ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, @@ -1823,6 +1884,60 @@ out: } +/* + * failure case + * A retry mechanism is used with exponential backoff + */ +static void ceph_fault(struct ceph_connection *con) +{ + derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name), + IPQUADPORT(con->peer_addr.ipaddr), con->error_msg); + dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n", + con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); + + if (test_bit(LOSSYTX, &con->state)) { + dout(30, "fault on LOSSYTX channel\n"); + remove_connection(con->msgr, con); + return; + } + + con_close_socket(con); + + /* hmm? */ + BUG_ON(test_bit(WAIT, &con->state)); + + /* + * If there are no messages in the queue, place the + * connection in a STANDBY state. otherwise, retry with + * delay + */ + spin_lock(&con->out_queue_lock); + if (list_empty(&con->out_queue)) { + dout(10, "fault setting STANDBY\n"); + set_bit(STANDBY, &con->state); + spin_unlock(&con->out_queue_lock); + return; + } + + dout(10, "fault setting BACKOFF\n"); + set_bit(BACKOFF, &con->state); + + if (con->delay == 0) + con->delay = BASE_DELAY_INTERVAL; + else if (con->delay < MAX_DELAY_INTERVAL) + con->delay *= 2; + + atomic_inc(&con->nref); + dout(40, "fault queueing %p %d -> %d delay %lu\n", con, + atomic_read(&con->nref) - 1, atomic_read(&con->nref), + con->delay); + queue_delayed_work(ceph_msgr_wq, &con->work, + round_jiffies_relative(con->delay)); + + list_splice_init(&con->out_sent, &con->out_queue); + spin_unlock(&con->out_queue_lock); +} + /* * worker function when listener receives a connect diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 0faeb4916561..3899137781f4 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -11,17 +11,52 @@ #include "ceph_fs.h" +/* + * Ceph uses the messenger to exchange ceph_msg messages with + * other hosts in the system. The messenger provides ordered and + * reliable delivery. It tolerates TCP disconnects by reconnecting + * (with exponential backoff) in the case of a fault (disconnection, + * bad crc, protocol error). Acks allow sent messages to be discarded + * by the sender. + * + * The network topology is flat: there is no "client" or "server," and + * any node can initiate a connection (i.e., send messages) to any other + * node. There is a fair bit of complexity to handle the "connection + * race" case where two nodes are simultaneously connecting to each other + * so that the end result is a single session. + * + * The messenger can also send messages in "lossy" mode, where there is + * no error recovery or connect retry... the message is just dropped if + * something goes wrong. + */ + struct ceph_msg; +#define IPQUADPORT(n) \ + (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF, \ + (unsigned int)((be32_to_cpu((n).sin_addr.s_addr)) >> 16) & 0xFF, \ + (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))>>8) & 0xFF, \ + (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))) & 0xFF, \ + (unsigned int)(ntohs((n).sin_port)) + + extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */ +/* + * Ceph defines these callbacks for handling events: + */ +/* handle an incoming message. */ typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m); -typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr, - struct ceph_entity_name *pn); +/* an incoming message has a data payload; tell me what pages I + * should read the data into. */ typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, int want); +/* a remote host as terminated a message exchange session, and messages + * we sent (or they tried to send us) may be lost. */ +typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr, + struct ceph_entity_name *pn); -static __inline__ const char *ceph_name_type_str(int t) { +static inline const char *ceph_name_type_str(int t) { switch (t) { case CEPH_ENTITY_TYPE_MON: return "mon"; case CEPH_ENTITY_TYPE_MDS: return "mds"; @@ -40,22 +75,37 @@ static __inline__ const char *ceph_name_type_str(int t) { le32_to_cpu((n).num) struct ceph_messenger { - void *parent; + void *parent; /* normally struct ceph_client * */ ceph_msgr_dispatch_t dispatch; ceph_msgr_peer_reset_t peer_reset; ceph_msgr_prepare_pages_t prepare_pages; + struct ceph_entity_inst inst; /* my name+address */ + struct socket *listen_sock; /* listening socket */ struct work_struct awork; /* accept work */ + spinlock_t con_lock; - struct list_head con_all; /* all connections */ - struct list_head con_accepting; /* accepting */ + struct list_head con_all; /* all open connections */ + struct list_head con_accepting; /* accepting */ struct radix_tree_root con_tree; /* established */ - struct page *zero_page; + + struct page *zero_page; /* used in certain error cases */ + + /* + * the global_seq counts connections i (attempt to) initiate + * in order to disambiguate certain connect race conditions. + */ u32 global_seq; spinlock_t global_seq_lock; }; +/* + * a single message. it contains a header (src, dest, message type, etc.), + * footer (crc values, mainly), a "front" message body, and possibly a + * data payload (stored in some number of pages). The page_mutex protects + * access to the page vector. + */ struct ceph_msg { struct ceph_msg_header hdr; /* header */ struct ceph_msg_footer footer; /* footer */ @@ -68,81 +118,100 @@ struct ceph_msg { }; struct ceph_msg_pos { - int page, page_pos; /* which page; -3=tag, -2=hdr, -1=front */ - int data_pos; - int did_page_crc; + int page, page_pos; /* which page; offset in page */ + int data_pos; /* offset in data payload */ + int did_page_crc; /* true if we've calculated crc for current page */ }; /* ceph connection fault delay defaults */ #define BASE_DELAY_INTERVAL (HZ/2) #define MAX_DELAY_INTERVAL (5 * 60 * HZ) -/* ceph_connection state bit flags */ -#define LOSSYTX 0 /* close channel on errors */ -#define LOSSYRX 1 /* close channel on errors */ +/* + * ceph_connection state bit flags + * + * QUEUED, BUSY, and BACKOFF are used together to ensure that only a + * single thread is currently opening, reading or writing data to the + * socket. + */ +#define LOSSYTX 0 /* we can close channel or drop messages on errors */ +#define LOSSYRX 1 /* peer may reset/drop messages */ #define CONNECTING 2 #define ACCEPTING 3 -#define WRITE_PENDING 4 /* we have data to send */ -#define QUEUED 5 /* there is work to be done */ +#define WRITE_PENDING 4 /* we have data ready to send */ +#define QUEUED 5 /* there is work queued on this connection */ #define BUSY 6 /* work is being done */ #define BACKOFF 7 /* backing off; will retry */ -#define STANDBY 8 /* standby, when socket state close, no messages */ -#define WAIT 9 /* wait for peer to connect */ -#define CLOSED 10 /* we've closed the connection */ +#define STANDBY 8 /* no outgoing messages, socket closed. we keep + * the ceph_connection around to maintain shared + * state with the peer. */ +#define WAIT 9 /* waiting for peer to connect to us (during a + * connection race) */ +#define CLOSED 10 /* we've closed the connection */ #define SOCK_CLOSED 11 /* socket state changed to closed */ -#define REGISTERED 12 - - +#define REGISTERED 12 /* connection appears in con_tree */ + +/* + * A single connection with another host. + * + * We maintain a queue of outgoing messages, and some session state to + * ensure that we can preserve the lossless, ordered delivery of + * messages in the case of a TCP disconnect. + */ struct ceph_connection { struct ceph_messenger *msgr; - struct socket *sock; /* connection socket */ - unsigned long state; /* connection state */ - const char *error_msg; + struct socket *sock; + unsigned long state; /* connection state (see flags above) */ + const char *error_msg; /* error message, if any */ atomic_t nref; - struct list_head list_all; /* msgr->con_all */ + struct list_head list_all; /* msgr->con_all */ struct list_head list_bucket; /* msgr->con_tree or con_accepting */ struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ - __u32 connect_seq, global_seq; - bool lossy_rx; /* true if sender is lossy */ - + __u32 connect_seq, global_seq; /* identify the most recent connection + attempt for this connection, client */ + /* out queue */ spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ struct list_head out_queue; struct list_head out_sent; /* sending/sent but unacked */ - __u32 out_seq; /* last message queued for send */ + __u32 in_seq, in_seq_acked; /* last message received, acked */ - /* negotiation temps */ + /* connection negotiation temps */ char in_banner[CEPH_BANNER_MAX_LEN]; struct ceph_msg_connect out_connect, in_connect; struct ceph_entity_addr actual_peer_addr; + u8 in_flags; /* follows TAG_READY */ - /* out */ - struct ceph_msg *out_msg; + /* message out temps */ + struct ceph_msg *out_msg; /* sending message (== tail of + out_sent) */ struct ceph_msg_pos out_msg_pos; - __le32 out32; - struct kvec out_kvec[6], + + struct kvec out_kvec[6], /* sending header/footer data */ *out_kvec_cur; - int out_kvec_left; /* kvec's left */ - int out_kvec_bytes; /* bytes left */ - int out_more; /* there is more data after this kvec */ - - /* partially read message contents */ - char in_tag; - u8 in_flags; - int in_base_pos; /* for ack seq, or msg headers, or handshake */ - __le32 in_partial_ack; + int out_kvec_left; /* kvec's left in out_kvec */ + int out_kvec_bytes; /* total bytes left */ + int out_more; /* there is more data after the kvecs */ + __le32 out_temp_ack; /* for writing an ack */ + + /* message in temps */ struct ceph_msg *in_msg; struct ceph_msg_pos in_msg_pos; - u32 in_front_crc, in_data_crc; + u32 in_front_crc, in_data_crc; /* calculated crc, for comparison + message footer */ + + char in_tag; /* protocol control byte */ + int in_base_pos; /* bytes read */ + __le32 in_temp_ack; /* for reading an ack */ struct delayed_work work; /* send|recv work */ - unsigned long delay; /* delay interval */ + unsigned long delay; /* current delay interval */ }; extern int ceph_msgr_init(void); @@ -154,13 +223,11 @@ extern void ceph_messenger_destroy(struct ceph_messenger *); extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr); -extern void ceph_queue_con(struct ceph_connection *con); - extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages); -static __inline__ struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) { +static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) { /*printk("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref), atomic_read(&msg->nref)+1);*/ atomic_inc(&msg->nref); diff --git a/src/kernel/super.h b/src/kernel/super.h index 3585ca9feb24..66feaff50f0f 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -54,13 +54,6 @@ extern int ceph_debug_mask; #define CEPH_BLOCK_SHIFT 20 /* 1 MB blocks for purposes of statfs*/ #define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT) -#define IPQUADPORT(n) \ - (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF, \ - (unsigned int)((be32_to_cpu((n).sin_addr.s_addr)) >> 16) & 0xFF, \ - (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))>>8) & 0xFF, \ - (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))) & 0xFF, \ - (unsigned int)(ntohs((n).sin_port)) - #if 0 # define dput(dentry) \