static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
+
+static void ceph_queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
-static void try_accept(struct work_struct *);
+static void ceph_fault(struct ceph_connection *con);
/*
- * workqueue
+ * work queue for all reading and writing to/from the socket.
*/
struct workqueue_struct *ceph_msgr_wq;
* socket callback functions
*/
-/* listen socket received a connect */
+/* listen socket received a connection */
static void ceph_accept_ready(struct sock *sk, int count_unused)
{
struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
queue_work(ceph_msgr_wq, &msgr->awork);
}
-/* Data available on socket or listen socket received a connect */
+/* data available on socket, or listen socket received a connect */
static void ceph_data_ready(struct sock *sk, int count_unused)
{
struct ceph_connection *con =
(struct ceph_connection *)sk->sk_user_data;
if (sk->sk_state != TCP_CLOSE_WAIT) {
- dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
+ dout(30, "ceph_data_ready on %p state = %lu, queueing work\n",
con, con->state);
ceph_queue_con(con);
}
}
-/* socket has bufferspace for writing */
+/* socket has buffer space for writing */
static void ceph_write_space(struct sock *sk)
{
struct ceph_connection *con =
(struct ceph_connection *)sk->sk_user_data;
- dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
-
- /* only queue to workqueue if a WRITE is pending */
+ /* only queue to workqueue if there is data we want to write. */
if (test_bit(WRITE_PENDING, &con->state)) {
- dout(30, "ceph_write_space %p queuing write work\n", con);
+ dout(30, "ceph_write_space %p queueing write work\n", con);
ceph_queue_con(con);
- }
+ } else
+ dout(30, "ceph_write_space %p nothing to write\n", con);
- /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
+ /* since we have our own write_space, clear the SOCK_NOSPACE flag */
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
}
-/* sockets state has change */
+/* socket's state has changed */
static void ceph_state_change(struct sock *sk)
{
struct ceph_connection *con =
break;
case TCP_ESTABLISHED:
dout(30, "ceph_state_change TCP_ESTABLISHED\n");
- ceph_write_space(sk);
+ ceph_queue_con(con);
break;
}
}
-/* make a listening socket active by setting up the data ready call back */
+/*
+ * set up socket callbacks
+ */
static void listen_sock_callbacks(struct socket *sock,
struct ceph_messenger *msgr)
{
sk->sk_data_ready = ceph_accept_ready;
}
-/* make a socket active by setting up the call back functions */
static void set_sock_callbacks(struct socket *sock,
struct ceph_connection *con)
{
*/
static struct socket *ceph_tcp_connect(struct ceph_connection *con)
{
- int ret;
struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
struct socket *sock;
+ int ret;
+ BUG_ON(con->sock);
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ERR_PTR(ret);
-
con->sock = sock;
sock->sk->sk_allocation = GFP_NOFS;
set_sock_callbacks(sock, con);
- dout (20, "connect %u.%u.%u.%u:%u\n",
- IPQUADPORT(*(struct sockaddr_in *)paddr));
+ dout(20, "connect %u.%u.%u.%u:%u\n",
+ IPQUADPORT(*(struct sockaddr_in *)paddr));
ret = sock->ops->connect(sock, paddr,
sizeof(struct sockaddr_in), O_NONBLOCK);
if (ret == -EINPROGRESS) {
- dout(20, "connect EINPROGRESS sk_state = = %u\n",
+ dout(20, "connect %u.%u.%u.%u:%u EINPROGRESS sk_state = %u\n",
+ IPQUADPORT(*(struct sockaddr_in *)paddr),
sock->sk->sk_state);
ret = 0;
}
if (ret < 0) {
- /* TBD check for fatal errors, retry if not fatal.. */
- derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
+ derr(1, "connect %u.%u.%u.%u:%u error %d\n",
IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
sock_release(sock);
con->sock = NULL;
+ con->error_msg = "connect error";
}
if (ret < 0)
}
/*
- * setup listening socket
+ * set up listening socket
*/
static int ceph_tcp_listen(struct ceph_messenger *msgr)
{
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ret;
-
sock->sk->sk_allocation = GFP_NOFS;
-
ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&optval, sizeof(optval));
if (ret < 0) {
- derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
+ derr(0, "failed to set SO_REUSEADDR: %d\n", ret);
goto err;
}
derr(0, "failed to getsockname: %d\n", ret);
goto err;
}
- dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
-
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
- (char *)&optval, sizeof(optval));
- if (ret < 0) {
- derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
- goto err;
- }
+ dout(0, "listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr));
- /* TBD: probaby want to tune the backlog queue .. */
- ret = sock->ops->listen(sock, CEPH_MSGR_BACKUP);
- if (ret < 0) {
- derr(0, "kernel_listen error: %d\n", ret);
- goto err;
- }
+ /* we don't care too much if this works or not */
+ sock->ops->listen(sock, CEPH_MSGR_BACKUP);
/* ok! */
msgr->listen_sock = sock;
listen_sock_callbacks(sock, msgr);
- return ret;
+ return 0;
err:
sock_release(sock);
}
/*
- * accept a connection
+ * accept a connection
*/
static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
{
- int ret;
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- int len;
struct socket *sock;
+ int ret;
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ret;
con->sock = sock;
-
sock->sk->sk_allocation = GFP_NOFS;
ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
sock->ops = lsock->ops;
sock->type = lsock->type;
- ret = sock->ops->getname(sock, paddr, &len, 2);
- if (ret < 0) {
- derr(0, "getname error: %d\n", ret);
- goto err;
- }
-
- /* setup callbacks */
set_sock_callbacks(sock, con);
-
return ret;
err:
return ret;
}
-/*
- * receive a message this may return after partial send
- */
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
- struct msghdr msg = {.msg_flags = 0};
- int rlen = 0; /* length read */
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
- msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
- return(rlen);
+ return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
}
-
/*
- * Send a message this may return after partial send
+ * write something. @more is true if caller will be sending more data
+ * shortly.
*/
static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
size_t kvlen, size_t len, int more)
{
- struct msghdr msg = {.msg_flags = 0};
- int rlen = 0;
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
- msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
if (more)
msg.msg_flags |= MSG_MORE;
else
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
- rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-
- return(rlen);
+ return kernel_sendmsg(sock, &msg, iov, kvlen, len);
}
-
/*
* create a new connection.
*/
static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
{
struct ceph_connection *con;
+
con = kzalloc(sizeof(struct ceph_connection), GFP_NOFS);
if (con == NULL)
return NULL;
-
con->msgr = msgr;
atomic_set(&con->nref, 1);
-
INIT_LIST_HEAD(&con->list_all);
INIT_LIST_HEAD(&con->list_bucket);
-
spin_lock_init(&con->out_queue_lock);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
-
INIT_DELAYED_WORK(&con->work, con_work);
-
return con;
}
/*
- * the radix_tree has an unsigned long key and void * value. since
- * ceph_entity_addr is bigger than that, we use a trivial hash key, and
- * point to a list_head in ceph_connection, as you would with a hash
- * table. in the rare event that the trivial hash collides, we just
- * traverse the (short) list.
+ * The con_tree radix_tree has an unsigned long key and void * value.
+ * Since ceph_entity_addr is bigger than that, we use a trivial hash
+ * key, and point to a list_head in ceph_connection, as you would with
+ * a hash table. If the trivial hash collides, we just traverse the
+ * (hopefully short) list until we find what we want.
*/
static unsigned long hash_addr(struct ceph_entity_addr *addr)
{
unsigned long key;
+
key = *(__u32 *)&addr->ipaddr.sin_addr.s_addr;
key ^= *(__u16 *)&addr->ipaddr.sin_port;
return key;
}
/*
- * get an existing connection, if any, for given addr
+ * Get an existing connection, if any, for given addr. Note that we
+ * may need to traverse the list_bucket list, which has to "head."
+ *
+ * called under con_lock.
*/
static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
struct ceph_entity_addr *addr)
struct list_head *head, *p;
unsigned long key = hash_addr(addr);
- /* existing? */
head = radix_tree_lookup(&msgr->con_tree, key);
if (head == NULL)
return NULL;
if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
goto yes;
}
-
return NULL;
yes:
/*
- * close connection socket
+ * Shutdown/close the socket for the given connection.
*/
static int con_close_socket(struct ceph_connection *con)
{
int rc;
+
dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
if (!con->sock)
return 0;
rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
sock_release(con->sock);
con->sock = NULL;
-
return rc;
}
}
/*
- * add to connections tree
+ * add a connection to the con_tree.
+ *
+ * called under con_lock.
*/
static int __register_connection(struct ceph_messenger *msgr,
- struct ceph_connection *con)
+ struct ceph_connection *con)
{
struct list_head *head;
unsigned long key = hash_addr(&con->peer_addr);
int rc = 0;
- /* inc ref count */
+ dout(20, "register_connection %p %d -> %d\n", con,
+ atomic_read(&con->nref), atomic_read(&con->nref) + 1);
atomic_inc(&con->nref);
- dout(20, "add_connection %p %d -> %d\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+ /* if were just ACCEPTING this connection, it is already on the
+ * con_all and con_accepting lists. */
if (test_and_clear_bit(ACCEPTING, &con->state)) {
list_del_init(&con->list_bucket);
put_connection(con);
head = radix_tree_lookup(&msgr->con_tree, key);
if (head) {
- dout(20, "add_connection %p in existing bucket %lu head %p\n",
+ dout(20, "register_connection %p in old bucket %lu head %p\n",
con, key, head);
list_add(&con->list_bucket, head);
} else {
- dout(20, "add_connection %p in new bucket %lu head %p\n", con,
- key, &con->list_bucket);
- INIT_LIST_HEAD(&con->list_bucket); /* empty */
+ dout(20, "register_connection %p in new bucket %lu head %p\n",
+ con, key, &con->list_bucket);
+ INIT_LIST_HEAD(&con->list_bucket); /* empty */
rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
-
if (rc < 0) {
list_del(&con->list_all);
+ put_connection(con);
return rc;
}
}
set_bit(REGISTERED, &con->state);
-
return 0;
}
+/*
+ * called under con_lock.
+ */
static void add_connection_accepting(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
}
/*
- * remove connection from all list.
- * also, from con_tree radix tree, if it should have been there
+ * Remove connection from all list. Also, from con_tree, if it should
+ * have been there.
+ *
+ * called under con_lock.
*/
static void __remove_connection(struct ceph_messenger *msgr,
struct ceph_connection *con)
}
list_del_init(&con->list_all);
if (test_bit(REGISTERED, &con->state)) {
- /* remove from con_tree too */
key = hash_addr(&con->peer_addr);
if (list_empty(&con->list_bucket)) {
- /* last one */
+ /* last one in this bucket */
dout(20, "__remove_connection %p and bucket %lu\n",
con, key);
radix_tree_delete(&msgr->con_tree, key);
} else {
+ /* if we share this bucket, and the radix tree points
+ * to us, adjust it to point to the next guy. */
slot = radix_tree_lookup_slot(&msgr->con_tree, key);
val = radix_tree_deref_slot(slot);
dout(20, "__remove_connection %p from bucket %lu "
list_del_init(&con->list_bucket);
}
}
- if (test_bit(ACCEPTING, &con->state))
+ if (test_and_clear_bit(ACCEPTING, &con->state))
list_del_init(&con->list_bucket);
put_connection(con);
}
spin_unlock(&msgr->con_lock);
}
-
-/*
- * atomically queue work on a connection. bump reference to avoid
- * races with connection teardown.
- */
-void ceph_queue_con(struct ceph_connection *con)
-{
- if (test_bit(WAIT, &con->state) ||
- test_bit(CLOSED, &con->state) ||
- test_bit(BACKOFF, &con->state)) {
- dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n",
- con);
- return;
- }
-
- atomic_inc(&con->nref);
- dout(40, "ceph_queue_con %p %d -> %d\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref));
-
- set_bit(QUEUED, &con->state);
- if (test_bit(BUSY, &con->state) ||
- !queue_work(ceph_msgr_wq, &con->work.work)) {
- dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
- put_connection(con);
- }
-}
-
-
/*
- * failure case
- * A retry mechanism is used with exponential backoff
+ * replace another connection
+ * (old and new should be for the _same_ peer,
+ * and thus in the same bucket in the radix tree)
*/
-static void ceph_fault(struct ceph_connection *con)
+static void __replace_connection(struct ceph_messenger *msgr,
+ struct ceph_connection *old,
+ struct ceph_connection *new)
{
- derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name),
- IPQUADPORT(con->peer_addr.ipaddr), con->error_msg);
- dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
- con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
+ /* replace in con_tree */
+ if (list_empty(&old->list_bucket)) {
+ /* oh, just replace old with new in bucket list */
+ list_add(&new->list_bucket, &old->list_bucket);
+ list_del_init(&old->list_bucket);
+ } else {
+ unsigned long key = hash_addr(&new->peer_addr);
+ void **slot;
- if (test_bit(LOSSYTX, &con->state)) {
- dout(30, "fault on LOSSYTX channel\n");
- remove_connection(con->msgr, con);
- return;
+ slot = radix_tree_lookup_slot(&msgr->con_tree, key);
+ BUG_ON(radix_tree_deref_slot(slot) != &old->list_bucket);
+ radix_tree_replace_slot(slot, &new->list_bucket);
}
- con_close_socket(con);
-
- /* hmm? */
- BUG_ON(test_bit(WAIT, &con->state));
+ /* take old connections message queue */
+ spin_lock(&old->out_queue_lock);
+ if (!list_empty(&old->out_queue))
+ list_splice_init(&new->out_queue, &old->out_queue);
+ spin_unlock(&old->out_queue_lock);
- /*
- * If there are no messages in the queue, place the
- * connection in a STANDBY state. otherwise, retry with
- * delay
- */
- spin_lock(&con->out_queue_lock);
- if (list_empty(&con->out_queue)) {
- dout(10, "fault setting STANDBY\n");
- set_bit(STANDBY, &con->state);
- spin_unlock(&con->out_queue_lock);
- return;
- }
+ new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
+ new->out_seq = old->out_seq;
- dout(10, "fault setting BACKOFF\n");
- set_bit(BACKOFF, &con->state);
+ set_bit(CLOSED, &old->state);
+ put_connection(old); /* dec reference count */
- if (con->delay == 0)
- con->delay = BASE_DELAY_INTERVAL;
- else if (con->delay < MAX_DELAY_INTERVAL)
- con->delay *= 2;
+ clear_bit(ACCEPTING, &new->state);
+}
- atomic_inc(&con->nref);
- dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref),
- con->delay);
- queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay));
- list_splice_init(&con->out_sent, &con->out_queue);
- spin_unlock(&con->out_queue_lock);
-}
-/*
- * non-blocking versions
- *
- * these should be called while holding con->con_lock
- */
/*
- * write as much of con->out_partial to the socket as we can.
- * 1 -> done
- * 0 -> socket full, but more to do
- * <0 -> error
+ * We maintain a global counter to order connection attempts. Get
+ * a unique seq greater than @gt.
*/
-static int write_partial_kvec(struct ceph_connection *con)
+static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
{
- int ret;
+ u32 ret;
- dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
- while (con->out_kvec_bytes > 0) {
- ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
- con->out_kvec_left, con->out_kvec_bytes,
- con->out_more);
- if (ret <= 0)
- goto out;
- con->out_kvec_bytes -= ret;
- if (con->out_kvec_bytes == 0)
- break; /* done */
- while (ret > 0) {
- if (ret >= con->out_kvec_cur->iov_len) {
- ret -= con->out_kvec_cur->iov_len;
- con->out_kvec_cur++;
- con->out_kvec_left--;
- } else {
- con->out_kvec_cur->iov_len -= ret;
- con->out_kvec_cur->iov_base += ret;
- ret = 0;
- break;
- }
- }
- }
- con->out_kvec_left = 0;
- ret = 1;
-out:
- dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
- con->out_kvec_left, con->out_kvec_bytes, ret);
- return ret; /* done! */
+ spin_lock(&msgr->global_seq_lock);
+ if (msgr->global_seq < gt)
+ msgr->global_seq = gt;
+ ret = ++msgr->global_seq;
+ spin_unlock(&msgr->global_seq_lock);
+ return ret;
}
-static int write_partial_msg_pages(struct ceph_connection *con,
- struct ceph_msg *msg)
-{
- int ret;
- unsigned data_len = le32_to_cpu(msg->hdr.data_len);
- struct ceph_client *client = con->msgr->parent;
- int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
- size_t len;
-
- dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n",
- con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
- con->out_msg_pos.page_pos);
-
- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
- struct page *page = NULL;
- void *kaddr = NULL;
-
- mutex_lock(&msg->page_mutex);
- if (msg->pages) {
- page = msg->pages[con->out_msg_pos.page];
- if (crc)
- kaddr = kmap(page);
- } else {
- /*dout(60, "using zero page\n");*/
- if (crc)
- kaddr = page_address(con->msgr->zero_page);
- }
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
- if (crc && !con->out_msg_pos.did_page_crc) {
- void *base = kaddr + con->out_msg_pos.page_pos;
-
- con->out_msg->footer.data_crc =
- cpu_to_le32(crc32c_le(le32_to_cpu(con->out_msg->footer.data_crc),
- base, len));
- con->out_msg_pos.did_page_crc = 1;
- }
-
- if (msg->pages)
- ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
- MSG_DONTWAIT | MSG_NOSIGNAL |
- MSG_MORE);
- else
- ret = kernel_sendpage(con->sock, con->msgr->zero_page,
- con->out_msg_pos.page_pos, len,
- MSG_DONTWAIT | MSG_NOSIGNAL |
- MSG_MORE);
-
- if (crc && msg->pages)
- kunmap(page);
-
- mutex_unlock(&msg->page_mutex);
- if (ret <= 0)
- goto out;
- con->out_msg_pos.data_pos += ret;
- con->out_msg_pos.page_pos += ret;
- if (ret == len) {
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.page++;
- con->out_msg_pos.did_page_crc = 0;
- }
- }
- /* done with data pages */
- dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
-
- /* queue up footer, too */
- if (!crc)
- con->out_msg->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
- con->out_kvec[0].iov_base = &con->out_msg->footer;
- con->out_kvec_bytes = con->out_kvec[0].iov_len =
- sizeof(con->out_msg->footer);
- con->out_kvec_left = 1;
- con->out_kvec_cur = con->out_kvec;
- con->out_msg = NULL;
- con->out_more = 0; /* end of message */
- ret = 1;
-out:
- return ret;
+/*
+ * Prepare footer for currently outgoing message, and finish things
+ * off. Assumes out_kvec* are already valid.. we just add on to the end.
+ */
+static void prepare_write_message_footer(struct ceph_connection *con, int v)
+{
+ struct ceph_msg *m = con->out_msg;
+
+ con->out_kvec[v].iov_base = &m->footer;
+ con->out_kvec[v].iov_len = sizeof(m->footer);
+ con->out_kvec_bytes += sizeof(m->footer);
+ con->out_kvec_left++;
+ con->out_msg = NULL; /* we're done with this one */
+ con->out_more = 0; /* end of message */
}
-
/*
- * build out_partial based on the next outgoing message in the queue.
+ * Prepare headers for the next outgoing message.
*/
static void prepare_write_message(struct ceph_connection *con)
{
struct ceph_msg *m;
int v = 0;
+
con->out_kvec_bytes = 0;
- /* ack? */
+ /* Sneak an ack in there first? If we can get it into the same
+ * TCP packet that's a good thing. */
if (con->in_seq > con->in_seq_acked) {
con->in_seq_acked = con->in_seq;
con->out_kvec[v].iov_base = &tag_ack;
con->out_kvec[v++].iov_len = 1;
- con->out32 = cpu_to_le32(con->in_seq_acked);
- con->out_kvec[v].iov_base = &con->out32;
+ con->out_temp_ack = cpu_to_le32(con->in_seq_acked);
+ con->out_kvec[v].iov_base = &con->out_temp_ack;
con->out_kvec[v++].iov_len = 4;
con->out_kvec_bytes = 1 + 4;
}
- /* move to sending/sent list */
+ /* move message to sending/sent list */
m = list_entry(con->out_queue.next,
struct ceph_msg, list_head);
list_del_init(&m->list_head);
list_add_tail(&m->list_head, &con->out_sent);
- con->out_msg = m; /* we dont bother taking a reference here. */
+ con->out_msg = m; /* we don't bother taking a reference here. */
- /* encode header */
dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
con->out_kvec_left = v;
con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
con->out_kvec_cur = con->out_kvec;
- con->out_more = 1; /* data? */
-
- /* pages */
- con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos = le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
- con->out_msg_pos.data_pos = 0;
- con->out_msg_pos.did_page_crc = 0;
/* fill in crc (except data pages), footer */
- con->out_msg->hdr.crc = cpu_to_le32(crc32c_le(0, (void *)&m->hdr,
- sizeof(m->hdr) - sizeof(m->hdr.crc)));
+ con->out_msg->hdr.crc =
+ cpu_to_le32(crc32c_le(0, (void *)&m->hdr,
+ sizeof(m->hdr) - sizeof(m->hdr.crc)));
con->out_msg->footer.flags = 0;
- con->out_msg->footer.front_crc = cpu_to_le32(crc32c_le(0, m->front.iov_base,
- m->front.iov_len));
+ con->out_msg->footer.front_crc =
+ cpu_to_le32(crc32c_le(0, m->front.iov_base, m->front.iov_len));
con->out_msg->footer.data_crc = 0;
+ /* is there a data payload? */
+ if (le32_to_cpu(m->hdr.data_len) == 0) {
+ /* initialize page iterator */
+ con->out_msg_pos.page = 0;
+ con->out_msg_pos.page_pos =
+ le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ con->out_msg_pos.data_pos = 0;
+ con->out_msg_pos.did_page_crc = 0;
+ con->out_more = 1; /* data + footer will follow */
+ } else {
+ /* no, queue up footer too and be done */
+ prepare_write_message_footer(con, v);
+ }
+
set_bit(WRITE_PENDING, &con->state);
}
/*
- * prepare an ack for send
+ * Prepare an ack.
*/
static void prepare_write_ack(struct ceph_connection *con)
{
con->out_kvec[0].iov_base = &tag_ack;
con->out_kvec[0].iov_len = 1;
- con->out32 = cpu_to_le32(con->in_seq_acked);
- con->out_kvec[1].iov_base = &con->out32;
+ con->out_temp_ack = cpu_to_le32(con->in_seq_acked);
+ con->out_kvec[1].iov_base = &con->out_temp_ack;
con->out_kvec[1].iov_len = 4;
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + 4;
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_read_connect(struct ceph_connection *con)
-{
- con->in_base_pos = 0;
-}
-
-static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
-{
- u32 ret;
- spin_lock(&msgr->global_seq_lock);
- if (msgr->global_seq < gt)
- msgr->global_seq = gt;
- ret = ++msgr->global_seq;
- spin_unlock(&msgr->global_seq_lock);
- return ret;
-}
+/*
+ * Connection negotiation.
+ */
+/*
+ * We connected to a peer and are saying hello.
+ */
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
+ int len = strlen(CEPH_BANNER);
+
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(con->global_seq);
con->out_connect.flags = 0;
con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
con->out_kvec[0].iov_base = CEPH_BANNER;
- con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
+ con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->inst.addr;
con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
con->out_kvec[2].iov_base = &con->out_connect;
con->out_kvec[2].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 3;
- con->out_kvec_bytes = strlen(CEPH_BANNER) +
- sizeof(msgr->inst.addr) +
+ con->out_kvec_bytes = len + sizeof(msgr->inst.addr) +
sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
+/*
+ * We accepted a connection and are saying hello.
+ */
static void prepare_write_accept_hello(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
+/*
+ * Write a single protocol control message (byte).
+ */
+static void prepare_write_tag(struct ceph_connection *con, char *ptag)
{
con->out_kvec[0].iov_base = ptag;
con->out_kvec[0].iov_len = 1;
set_bit(WRITE_PENDING, &con->state);
}
+/*
+ * Negotiation succeeded on an incoming connection, tell the peer.
+ */
static void prepare_write_accept_ready(struct ceph_connection *con)
{
con->out_connect.flags = 0;
set_bit(WRITE_PENDING, &con->state);
}
+/*
+ * The connecting peer needs to try again with a larger connect_seq or
+ * global_seq (as indicated by *ptag).
+ */
static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
__le32 *pseq)
{
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
- /* we'll re-read the connect request, but not the hello */
+ /* we'll re-read the connect request, sans the hello + addr */
con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr);
}
+
+
+
/*
- * worker function when socket is writeable
+ * write as much of pending kvecs to the socket as we can.
+ * 1 -> done
+ * 0 -> socket full, but more to do
+ * <0 -> error
*/
-static int try_write(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con)
{
- struct ceph_messenger *msgr = con->msgr;
- int ret = 1;
-
- dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
- atomic_read(&con->nref));
-
-more:
- dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
+ int ret;
- /* initiate connect? */
- if (con->sock == NULL) {
- if (test_and_clear_bit(STANDBY, &con->state))
- con->connect_seq++;
- con->global_seq = get_global_seq(msgr, 0);
- prepare_write_connect(msgr, con);
- prepare_read_connect(con);
- set_bit(CONNECTING, &con->state);
- con->in_tag = CEPH_MSGR_TAG_READY;
- dout(5, "try_write initiating connect on %p new state %lu\n",
- con, con->state);
- BUG_ON(con->sock);
- con->sock = ceph_tcp_connect(con);
- dout(10, "tcp_connect returned %p\n", con->sock);
- if (IS_ERR(con->sock)) {
- con->sock = NULL;
- con->error_msg = "connect error";
- ret = -1;
+ dout(10, "write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
+ while (con->out_kvec_bytes > 0) {
+ ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+ con->out_kvec_left, con->out_kvec_bytes,
+ con->out_more);
+ if (ret <= 0)
goto out;
+ con->out_kvec_bytes -= ret;
+ if (con->out_kvec_bytes == 0)
+ break; /* done */
+ while (ret > 0) {
+ if (ret >= con->out_kvec_cur->iov_len) {
+ ret -= con->out_kvec_cur->iov_len;
+ con->out_kvec_cur++;
+ con->out_kvec_left--;
+ } else {
+ con->out_kvec_cur->iov_len -= ret;
+ con->out_kvec_cur->iov_base += ret;
+ ret = 0;
+ break;
+ }
}
}
+ con->out_kvec_left = 0;
+ ret = 1;
+out:
+ dout(30, "write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
+ con->out_kvec_bytes, con->out_kvec_left, ret);
+ return ret; /* done! */
+}
- /* kvec data queued? */
-more_kvec:
- if (con->out_kvec_left) {
- ret = write_partial_kvec(con);
- if (ret == 0)
- goto done;
- if (ret < 0) {
- dout(30, "try_write write_partial_kvec err %d\n", ret);
- goto done;
+/*
+ * Write as much message data payload as we can. If we finish, queue
+ * up the footer.
+ * 1 -> done, footer is now queued in out_kvec[].
+ * 0 -> socket full, but more to do
+ * <0 -> error
+ */
+static int write_partial_msg_pages(struct ceph_connection *con)
+{
+ struct ceph_client *client = con->msgr->parent;
+ struct ceph_msg *msg = con->out_msg;
+ unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+ size_t len;
+ int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
+ int ret;
+
+ dout(30, "write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
+ con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+ con->out_msg_pos.page_pos);
+
+ while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+ struct page *page = NULL;
+ void *kaddr = NULL;
+
+ /*
+ * if we are calculating the data crc (the default), we need
+ * to map the page. if our pages[] has been revoked, use the
+ * zero page.
+ */
+ mutex_lock(&msg->page_mutex);
+ if (msg->pages) {
+ page = msg->pages[con->out_msg_pos.page];
+ if (crc)
+ kaddr = kmap(page);
+ } else {
+ page = con->msgr->zero_page;
+ if (crc)
+ kaddr = page_address(con->msgr->zero_page);
}
- }
+ len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
+ (int)(data_len - con->out_msg_pos.data_pos));
+ if (crc && !con->out_msg_pos.did_page_crc) {
+ void *base = kaddr + con->out_msg_pos.page_pos;
+ u32 crc = le32_to_cpu(con->out_msg->footer.data_crc);
- /* msg pages? */
- if (con->out_msg) {
- ret = write_partial_msg_pages(con, con->out_msg);
- if (ret == 1)
- goto more_kvec;
- if (ret == 0)
- goto done;
- if (ret < 0) {
- dout(30, "try_write write_partial_msg_pages err %d\n",
- ret);
- goto done;
+ con->out_msg->footer.data_crc =
+ cpu_to_le32(crc32c_le(crc, base, len));
+ con->out_msg_pos.did_page_crc = 1;
}
- }
- con->out_msg = NULL; /* done with this message. */
- /* anything else pending? */
- spin_lock(&con->out_queue_lock);
- if (!list_empty(&con->out_queue)) {
- prepare_write_message(con);
- } else if (con->in_seq > con->in_seq_acked) {
- prepare_write_ack(con);
- } else {
- clear_bit(WRITE_PENDING, &con->state);
- /* hmm, nothing to do! No more writes pending? */
- dout(30, "try_write nothing else to write.\n");
- spin_unlock(&con->out_queue_lock);
- goto done;
+ ret = kernel_sendpage(con->sock, page,
+ con->out_msg_pos.page_pos, len,
+ MSG_DONTWAIT | MSG_NOSIGNAL |
+ MSG_MORE);
+
+ if (crc && msg->pages)
+ kunmap(page);
+
+ mutex_unlock(&msg->page_mutex);
+ if (ret <= 0)
+ goto out;
+
+ con->out_msg_pos.data_pos += ret;
+ con->out_msg_pos.page_pos += ret;
+ if (ret == len) {
+ con->out_msg_pos.page_pos = 0;
+ con->out_msg_pos.page++;
+ con->out_msg_pos.did_page_crc = 0;
+ }
}
- spin_unlock(&con->out_queue_lock);
- goto more;
-done:
- ret = 0;
+ dout(30, "write_partial_msg_pages %p msg %p done\n", con, msg);
+
+ /* prepare and queue up footer, too */
+ if (!crc)
+ con->out_msg->footer.flags |=
+ cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
+ con->out_kvec_bytes = 0;
+ con->out_kvec_left = 0;
+ con->out_kvec_cur = con->out_kvec;
+ prepare_write_message_footer(con, 0);
+ ret = 1;
out:
- dout(30, "try_write done on %p\n", con);
return ret;
}
+
+
+/*
+ * Prepare to read connection handshake, or an ack.
+ */
+static void prepare_read_connect(struct ceph_connection *con)
+{
+ con->in_base_pos = 0;
+}
+
+static void prepare_read_ack(struct ceph_connection *con)
+{
+ con->in_base_pos = 0;
+}
+
/*
- * prepare to read a message
+ * Prepare to read a message.
*/
static int prepare_read_message(struct ceph_connection *con)
{
int err;
+
BUG_ON(con->in_msg != NULL);
con->in_base_pos = 0;
con->in_msg = ceph_msg_new(0, 0, 0, 0, NULL);
if (IS_ERR(con->in_msg)) {
- /* TBD: we don't check for error in caller, handle it here? */
err = PTR_ERR(con->in_msg);
con->in_msg = NULL;
- derr(1, "kmalloc failure on incoming message %d\n", err);
+ con->error_msg = "out of memory for incoming message";
return err;
}
con->in_front_crc = con->in_data_crc = 0;
return 0;
}
+
+
/*
- * read (part of) a message
+ * Read all or part of the connect-side handshake on a new connection
*/
-static int read_message_partial(struct ceph_connection *con)
+static int read_partial_connect(struct ceph_connection *con)
{
- struct ceph_msg *m = con->in_msg;
- void *p;
- int ret;
- int to, want, left;
- unsigned front_len, data_len, data_off;
+ int ret, to;
+ dout(20, "read_partial_connect %p at %d\n", con, con->in_base_pos);
- dout(20, "read_message_partial con %p msg %p\n", con, m);
+ /* peer's banner */
+ to = strlen(CEPH_BANNER);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_banner + have,
+ left);
+ if (ret <= 0)
+ goto out;
+ con->in_base_pos += ret;
+ }
- /* header */
- while (con->in_base_pos < sizeof(m->hdr)) {
- left = sizeof(m->hdr) - con->in_base_pos;
+ /* peer's addr */
+ to += sizeof(con->actual_peer_addr);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(con->actual_peer_addr) - left;
ret = ceph_tcp_recvmsg(con->sock,
- (char *)&m->hdr + con->in_base_pos,
+ (char *)&con->actual_peer_addr + have,
left);
if (ret <= 0)
- return ret;
+ goto out;
con->in_base_pos += ret;
- if (con->in_base_pos == sizeof(m->hdr)) {
- u32 crc = crc32c_le(0, (void *)&m->hdr,
- sizeof(m->hdr) - sizeof(m->hdr.crc));
- if (crc != le32_to_cpu(m->hdr.crc)) {
- derr(0, "read_message_partial %p bad hdr crc"
- " %u != expected %u\n",
- m, crc, m->hdr.crc);
- return -EIO;
- }
- }
}
- /* front */
- front_len = le32_to_cpu(m->hdr.front_len);
- while (m->front.iov_len < front_len) {
- if (m->front.iov_base == NULL) {
- m->front.iov_base = kmalloc(front_len, GFP_NOFS);
- if (m->front.iov_base == NULL)
- return -ENOMEM;
- }
- left = front_len - m->front.iov_len;
- ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
- m->front.iov_len, left);
+ /* in_tag */
+ to += 1;
+ if (con->in_base_pos < to) {
+ ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
if (ret <= 0)
- return ret;
- m->front.iov_len += ret;
- if (m->front.iov_len == front_len)
- con->in_front_crc = crc32c_le(0, m->front.iov_base,
- m->front.iov_len);
- }
-
- /* (page) data */
- data_len = le32_to_cpu(m->hdr.data_len);
- data_off = le32_to_cpu(m->hdr.data_off);
- if (data_len == 0)
- goto no_data;
- if (m->nr_pages == 0) {
- con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
- con->in_msg_pos.data_pos = 0;
- /* find pages for data payload */
- want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
- ret = 0;
- BUG_ON(!con->msgr->prepare_pages);
- ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
- if (ret < 0) {
- dout(10, "prepare_pages failed, skipping payload\n");
- con->in_base_pos = -data_len - sizeof(m->footer);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
- } else {
- BUG_ON(m->nr_pages < want);
- }
- }
- while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- mutex_lock(&m->page_mutex);
- if (!m->pages) {
- dout(10, "pages revoked during msg read\n");
- mutex_unlock(&m->page_mutex);
- con->in_base_pos = con->in_msg_pos.data_pos - data_len -
- sizeof(m->footer);
- ceph_msg_put(m);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
- }
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0)
- con->in_data_crc =
- crc32c_le(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- mutex_unlock(&m->page_mutex);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
- }
- }
-
-no_data:
- /* footer */
- to = sizeof(m->hdr) + sizeof(m->footer);
- while (con->in_base_pos < to) {
- left = to - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
- (con->in_base_pos - sizeof(m->hdr)),
- left);
- if (ret <= 0)
- return ret;
- con->in_base_pos += ret;
- }
- dout(20, "read_message_partial got msg %p\n", m);
-
- /* crc ok? */
- if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
- derr(0, "read_message_partial %p front crc %u != expected %u\n",
- con->in_msg,
- con->in_front_crc, m->footer.front_crc);
- return -EIO;
- }
- if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
- derr(0, "read_message_partial %p data crc %u != expected %u\n",
- con->in_msg,
- con->in_data_crc, m->footer.data_crc);
- return -EIO;
- }
-
- /* did i learn my ip? */
- if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- /*
- * in practice, we learn our ip from the first incoming mon
- * message, before anyone else knows we exist, so this is
- * safe.
- */
- con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
- dout(10, "read_message_partial learned my addr is "
- "%u.%u.%u.%u:%u\n",
- IPQUADPORT(con->msgr->inst.addr.ipaddr));
- }
-
- return 1; /* done! */
-}
-
-static void process_message(struct ceph_connection *con)
-{
- /* if first message, set peer_name */
- if (con->peer_name.type == 0)
- con->peer_name = con->in_msg->hdr.src.name;
-
- spin_lock(&con->out_queue_lock);
- con->in_seq++;
- spin_unlock(&con->out_queue_lock);
-
- dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
- con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
- ENTITY_NAME(con->in_msg->hdr.src.name),
- le16_to_cpu(con->in_msg->hdr.type),
- ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
- le32_to_cpu(con->in_msg->hdr.front_len),
- le32_to_cpu(con->in_msg->hdr.data_len),
- con->in_front_crc, con->in_data_crc);
- con->msgr->dispatch(con->msgr->parent, con->in_msg);
- con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-/*
- * prepare to read an ack
- */
-static void prepare_read_ack(struct ceph_connection *con)
-{
- con->in_base_pos = 0;
-}
-
-/*
- * read (part of) an ack
- */
-static int read_ack_partial(struct ceph_connection *con)
-{
- while (con->in_base_pos < sizeof(con->in_partial_ack)) {
- int left = sizeof(con->in_partial_ack) - con->in_base_pos;
- int ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_partial_ack +
- con->in_base_pos, left);
- if (ret <= 0)
- return ret;
- con->in_base_pos += ret;
- }
- return 1; /* done */
-}
-
-static void process_ack(struct ceph_connection *con)
-{
- struct ceph_msg *m;
- u32 ack = le32_to_cpu(con->in_partial_ack);
- u64 seq;
-
- spin_lock(&con->out_queue_lock);
- while (!list_empty(&con->out_sent)) {
- m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
- seq = le64_to_cpu(m->hdr.seq);
- if (seq > ack)
- break;
- dout(5, "got ack for seq %llu type %d at %p\n", seq,
- le16_to_cpu(m->hdr.type), m);
- list_del_init(&m->list_head);
- ceph_msg_put(m);
- }
- spin_unlock(&con->out_queue_lock);
- con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-
-/*
- * read portion of connect-side handshake on a new connection
- */
-static int read_connect_partial(struct ceph_connection *con)
-{
- int ret, to;
- dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
-
- /* peer's banner */
- to = strlen(CEPH_BANNER);
- while (con->in_base_pos < to) {
- int left = to - con->in_base_pos;
- int have = con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_banner + have,
- left);
- if (ret <= 0)
- goto out;
- con->in_base_pos += ret;
- }
-
- /* peer's addr */
- to += sizeof(con->actual_peer_addr);
- while (con->in_base_pos < to) {
- int left = to - con->in_base_pos;
- int have = sizeof(con->actual_peer_addr) - left;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->actual_peer_addr + have,
- left);
- if (ret <= 0)
- goto out;
- con->in_base_pos += ret;
- }
-
- /* in_tag */
- to += 1;
- if (con->in_base_pos < to) {
- ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
- if (ret <= 0)
- goto out;
- con->in_base_pos += ret;
+ goto out;
+ con->in_base_pos += ret;
}
+ /* TAG_READY is followed by a u8 flags */
if (con->in_tag == CEPH_MSGR_TAG_READY) {
to++;
if (con->in_base_pos < to) {
}
}
+ /* TAG_RETRY_SESSION is followed by a __le32 connect_seq */
if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
/* peer's connect_seq */
to += sizeof(con->in_connect.connect_seq);
con->in_base_pos += ret;
}
}
+
+ /* TAG_RETRY_SESSION is followed by a __le32 global_seq */
if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
/* peer's global_seq */
to += sizeof(con->in_connect.global_seq);
con->in_base_pos += ret;
}
}
- ret = 1;
-out:
- dout(20, "read_connect_partial %p end at %d ret %d\n", con,
- con->in_base_pos, ret);
- dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n",
- le32_to_cpu(con->in_connect.connect_seq),
+ ret = 1; /* done */
+ dout(20, "read_partial_connect %p connect_seq = %u, global_seq = %u\n",
+ con, le32_to_cpu(con->in_connect.connect_seq),
le32_to_cpu(con->in_connect.global_seq));
- return ret; /* done */
+out:
+ return ret;
+}
+
+/*
+ * Verify the hello banner looks okay.
+ */
+static int verify_hello(struct ceph_connection *con)
+{
+ if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ derr(10, "connection to/from %u.%u.%u.%u:%u has bad banner\n",
+ IPQUADPORT(con->peer_addr.ipaddr));
+ con->error_msg = "protocol error, bad banner";
+ return -1;
+ }
+ return 0;
}
/*
- * Reset a connection
+ * Reset a connection. Discard all incoming and outgoing messages
+ * and clear *_seq state.
*/
static void reset_connection(struct ceph_connection *con)
{
spin_unlock(&con->out_queue_lock);
}
-static int verify_hello(struct ceph_connection *con)
-{
- if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
- derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n",
- IPQUADPORT(con->peer_addr.ipaddr));
- con->error_msg = "protocol error, bad banner";
- return -1;
- }
- return 0;
-}
static int process_connect(struct ceph_connection *con)
{
if (verify_hello(con) < 0)
return -1;
- /* verify peer addr */
+ /*
+ * Make sure the other end is who we wanted. note that the other
+ * end may not yet know their ip address, so if it's 0.0.0.0, give
+ * them the benefit of the doubt.
+ */
if (!ceph_entity_addr_is_local(&con->peer_addr,
&con->actual_peer_addr) &&
con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) {
switch (con->in_tag) {
case CEPH_MSGR_TAG_RESETSESSION:
+ /*
+ * If we connected with a large connect_seq but the peer
+ * has no record of a session with us (no connection, or
+ * connect_seq == 0), they will send RESETSESION to indicate
+ * that they must have reset their session, and may have
+ * dropped messages.
+ */
dout(10, "process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_connect.connect_seq));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
+ /* Tell ceph about it. */
con->msgr->peer_reset(con->msgr->parent, &con->peer_addr,
&con->peer_name);
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
+ /*
+ * If we sent a smaller connect_seq than the peer has, try
+ * again with a larger value.
+ */
dout(10,
"process_connect got RETRY my seq = %u, peer_seq = %u\n",
le32_to_cpu(con->out_connect.connect_seq),
prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_RETRY_GLOBAL:
+ /*
+ * If we sent a smaller global_seq than the peer has, try
+ * again with a larger value.
+ */
dout(10,
"process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
con->global_seq, le32_to_cpu(con->in_connect.global_seq));
prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_WAIT:
+ /*
+ * If there is a connection race (we are opening connections to
+ * each other), one of us may just have to WAIT. We will keep
+ * our queued messages, in expectation of being replaced by an
+ * incoming connection.
+ */
dout(10, "process_connect peer connecting WAIT\n");
set_bit(WAIT, &con->state);
con_close_socket(con);
case CEPH_MSGR_TAG_READY:
dout(10, "process_connect got READY, now open\n");
clear_bit(CONNECTING, &con->state);
- con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
- con->delay = 0; /* reset backoffmemory */
+ if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX)
+ set_bit(LOSSYRX, &con->state);
+ con->delay = 0; /* reset backoff memory */
break;
default:
derr(1, "process_connect protocol error, will retry\n");
/*
- * read portion of accept-side handshake on a newly accepted connection
+ * Read all or part of the accept-side handshake on a newly accepted
+ * connection.
*/
-static int read_accept_partial(struct ceph_connection *con)
+static int read_partial_accept(struct ceph_connection *con)
{
int ret;
int to;
con->in_base_pos += ret;
}
+ return 1; /* done */
+}
+
+/*
+ * Call after a new connection's handshake has been read.
+ */
+static int process_accept(struct ceph_connection *con)
+{
+ struct ceph_connection *existing;
+ struct ceph_messenger *msgr = con->msgr;
+ u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
+ u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
+
+ if (verify_hello(con) < 0)
+ return -1;
+
+ /* note flags */
+ if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX)
+ set_bit(LOSSYRX, &con->state);
+
+ /* do we have an existing connection for this peer? */
+ if (radix_tree_preload(GFP_NOFS) < 0) {
+ derr(10, "ENOMEM in process_accept\n");
+ con->error_msg = "out of memory";
+ return -1;
+ }
+ spin_lock(&msgr->con_lock);
+ existing = __get_connection(msgr, &con->peer_addr);
+ if (existing) {
+ if (peer_gseq < existing->global_seq) {
+ /* retry_global */
+ con->global_seq = existing->global_seq;
+ con->out_connect.global_seq =
+ cpu_to_le32(con->global_seq);
+ prepare_write_accept_retry(con,
+ &tag_retry_global,
+ &con->out_connect.global_seq);
+ } else if (test_bit(LOSSYTX, &existing->state)) {
+ dout(20, "process_accept %p replacing LOSSYTX %p\n",
+ con, existing);
+ reset_connection(existing);
+ __replace_connection(msgr, existing, con);
+ prepare_write_accept_ready(con);
+ } else if (peer_cseq < existing->connect_seq) {
+ if (peer_cseq == 0) {
+ reset_connection(existing);
+ __replace_connection(msgr, existing, con);
+ prepare_write_accept_ready(con);
+ con->msgr->peer_reset(con->msgr->parent,
+ &con->peer_addr,
+ &con->peer_name);
+ } else {
+ /* old attempt or peer didn't get the READY */
+ /* send retry with peers connect seq */
+ con->connect_seq = existing->connect_seq;
+ con->out_connect.connect_seq =
+ cpu_to_le32(con->connect_seq);
+ prepare_write_accept_retry(con,
+ &tag_retry_session,
+ &con->out_connect.connect_seq);
+ }
+ } else if (peer_cseq == existing->connect_seq &&
+ (test_bit(CONNECTING, &existing->state) ||
+ test_bit(STANDBY, &existing->state) ||
+ test_bit(WAIT, &existing->state))) {
+ /* connection race */
+ dout(20, "process_accept connection race state = %lu\n",
+ con->state);
+ if (ceph_entity_addr_equal(&msgr->inst.addr,
+ &con->peer_addr)) {
+ /* incoming connection wins.. */
+ /* replace existing with new connection */
+ __replace_connection(msgr, existing, con);
+ prepare_write_accept_ready(con);
+ } else {
+ /* our existing outgoing connection wins..
+ tell peer to wait for our outgoing
+ connection to go through */
+ prepare_write_tag(con, &tag_wait);
+ }
+ } else if (existing->connect_seq == 0 &&
+ peer_cseq > existing->connect_seq) {
+ /* we reset and already reconnecting */
+ prepare_write_tag(con, &tag_reset);
+ } else {
+ /* reconnect case, replace connection */
+ __replace_connection(msgr, existing, con);
+ prepare_write_accept_ready(con);
+ }
+ put_connection(existing);
+ } else if (peer_cseq > 0) {
+ dout(20, "process_accept no existing connection, we reset\n");
+ prepare_write_tag(con, &tag_reset);
+ } else {
+ dout(20, "process_accept no existing connection, opening\n");
+ __register_connection(msgr, con);
+ con->global_seq = peer_gseq;
+ con->connect_seq = peer_cseq + 1;
+ prepare_write_accept_ready(con);
+ }
+ spin_unlock(&msgr->con_lock);
+ radix_tree_preload_end();
+
+ ceph_queue_con(con);
+ put_connection(con);
+ return 0;
+}
+
+/*
+ * read (part of) an ack
+ */
+static int read_partial_ack(struct ceph_connection *con)
+{
+ while (con->in_base_pos < sizeof(con->in_temp_ack)) {
+ int left = sizeof(con->in_temp_ack) - con->in_base_pos;
+ int ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_temp_ack +
+ con->in_base_pos, left);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
+ }
+ return 1; /* done */
+}
+
+/*
+ * We can finally discard anything that's been acked.
+ */
+static void process_ack(struct ceph_connection *con)
+{
+ struct ceph_msg *m;
+ u32 ack = le32_to_cpu(con->in_temp_ack);
+ u64 seq;
+
+ spin_lock(&con->out_queue_lock);
+ while (!list_empty(&con->out_sent)) {
+ m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
+ seq = le64_to_cpu(m->hdr.seq);
+ if (seq > ack)
+ break;
+ dout(5, "got ack for seq %llu type %d at %p\n", seq,
+ le16_to_cpu(m->hdr.type), m);
+ list_del_init(&m->list_head);
+ ceph_msg_put(m);
+ }
+ spin_unlock(&con->out_queue_lock);
+ con->in_tag = CEPH_MSGR_TAG_READY;
+}
+
+
+
+
+
+
+/*
+ * read (part of) a message.
+ */
+static int read_partial_message(struct ceph_connection *con)
+{
+ struct ceph_msg *m = con->in_msg;
+ void *p;
+ int ret;
+ int to, want, left;
+ unsigned front_len, data_len, data_off;
+
+ dout(20, "read_partial_message con %p msg %p\n", con, m);
+
+ /* header */
+ while (con->in_base_pos < sizeof(m->hdr)) {
+ left = sizeof(m->hdr) - con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&m->hdr + con->in_base_pos,
+ left);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
+ if (con->in_base_pos == sizeof(m->hdr)) {
+ u32 crc = crc32c_le(0, (void *)&m->hdr,
+ sizeof(m->hdr) - sizeof(m->hdr.crc));
+ if (crc != le32_to_cpu(m->hdr.crc)) {
+ derr(0, "read_partial_message %p bad hdr crc"
+ " %u != expected %u\n",
+ m, crc, m->hdr.crc);
+ return -EIO;
+ }
+ }
+ }
+
+ /* front */
+ front_len = le32_to_cpu(m->hdr.front_len);
+ while (m->front.iov_len < front_len) {
+ if (m->front.iov_base == NULL) {
+ m->front.iov_base = kmalloc(front_len, GFP_NOFS);
+ if (m->front.iov_base == NULL)
+ return -ENOMEM;
+ }
+ left = front_len - m->front.iov_len;
+ ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
+ m->front.iov_len, left);
+ if (ret <= 0)
+ return ret;
+ m->front.iov_len += ret;
+ if (m->front.iov_len == front_len)
+ con->in_front_crc = crc32c_le(0, m->front.iov_base,
+ m->front.iov_len);
+ }
+
+ /* (page) data */
+ data_len = le32_to_cpu(m->hdr.data_len);
+ data_off = le32_to_cpu(m->hdr.data_off);
+ if (data_len == 0)
+ goto no_data;
+ if (m->nr_pages == 0) {
+ con->in_msg_pos.page = 0;
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ con->in_msg_pos.data_pos = 0;
+ /* find pages for data payload */
+ want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+ ret = 0;
+ BUG_ON(!con->msgr->prepare_pages);
+ ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
+ if (ret < 0) {
+ dout(10, "prepare_pages failed, skipping payload\n");
+ con->in_base_pos = -data_len - sizeof(m->footer);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ return 0;
+ }
+ BUG_ON(m->nr_pages < want);
+ }
+ while (con->in_msg_pos.data_pos < data_len) {
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ mutex_lock(&m->page_mutex);
+ if (!m->pages) {
+ dout(10, "pages revoked during msg read\n");
+ mutex_unlock(&m->page_mutex);
+ con->in_base_pos = con->in_msg_pos.data_pos - data_len -
+ sizeof(m->footer);
+ ceph_msg_put(m);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ return 0;
+ }
+ p = kmap(m->pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0)
+ con->in_data_crc =
+ crc32c_le(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(m->pages[con->in_msg_pos.page]);
+ mutex_unlock(&m->page_mutex);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+ }
+
+no_data:
+ /* footer */
+ to = sizeof(m->hdr) + sizeof(m->footer);
+ while (con->in_base_pos < to) {
+ left = to - con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
+ (con->in_base_pos - sizeof(m->hdr)),
+ left);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
+ }
+ dout(20, "read_partial_message got msg %p\n", m);
+
+ /* crc ok? */
+ if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
+ derr(0, "read_partial_message %p front crc %u != expected %u\n",
+ con->in_msg,
+ con->in_front_crc, m->footer.front_crc);
+ return -EIO;
+ }
+ if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
+ derr(0, "read_partial_message %p data crc %u != expected %u\n",
+ con->in_msg,
+ con->in_data_crc, m->footer.data_crc);
+ return -EIO;
+ }
+
+ /* did i learn my ip? */
+ if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+ /*
+ * in practice, we learn our ip from the first incoming mon
+ * message, before anyone else knows we exist, so this is
+ * safe.
+ */
+ con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
+ dout(10, "read_partial_message learned my addr is "
+ "%u.%u.%u.%u:%u\n",
+ IPQUADPORT(con->msgr->inst.addr.ipaddr));
+ }
+
+ return 1; /* done! */
+}
+
+/*
+ * Process message. This happens in the worker thread. The callback should
+ * be careful not to do anything that waits on other incoming messages or it
+ * may deadlock.
+ */
+static void process_message(struct ceph_connection *con)
+{
+ /* if first message, set peer_name */
+ if (con->peer_name.type == 0)
+ con->peer_name = con->in_msg->hdr.src.name;
+
+ spin_lock(&con->out_queue_lock);
+ con->in_seq++;
+ spin_unlock(&con->out_queue_lock);
+
+ dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
+ con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
+ ENTITY_NAME(con->in_msg->hdr.src.name),
+ le16_to_cpu(con->in_msg->hdr.type),
+ ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
+ le32_to_cpu(con->in_msg->hdr.front_len),
+ le32_to_cpu(con->in_msg->hdr.data_len),
+ con->in_front_crc, con->in_data_crc);
+ con->msgr->dispatch(con->msgr->parent, con->in_msg);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+}
+
- return 1; /* done */
-}
-/*
- * replace another connection
- * (old and new should be for the _same_ peer,
- * and thus in the same pos in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr,
- struct ceph_connection *old,
- struct ceph_connection *new)
-{
- /* take old connections message queue */
- spin_lock(&old->out_queue_lock);
- if (!list_empty(&old->out_queue))
- list_splice_init(&new->out_queue, &old->out_queue);
- spin_unlock(&old->out_queue_lock);
- new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
- new->out_seq = old->out_seq;
- /* replace list entry */
- list_add(&new->list_bucket, &old->list_bucket);
- list_del_init(&old->list_bucket);
- set_bit(CLOSED, &old->state);
- put_connection(old); /* dec reference count */
- clear_bit(ACCEPTING, &new->state);
- prepare_write_accept_ready(new);
-}
/*
- * call after a new connection's handshake has completed
+ * Write something to the socket. Called in a worker thread when the
+ * socket appears to be writeable and we have something ready to send.
*/
-static int process_accept(struct ceph_connection *con)
+static int try_write(struct ceph_connection *con)
{
- struct ceph_connection *existing;
struct ceph_messenger *msgr = con->msgr;
- u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
- u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
+ int ret = 1;
- if (verify_hello(con) < 0)
- return -1;
+ dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
+ atomic_read(&con->nref));
- /* note flags */
- con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
+more:
+ dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
- /* connect */
- /* do we have an existing connection for this peer? */
- if (radix_tree_preload(GFP_NOFS) < 0) {
- derr(10, "ENOMEM in process_accept\n");
- con->error_msg = "out of memory";
- return -1;
+ /* open the socket first? */
+ if (con->sock == NULL) {
+ /*
+ * if we were STANDBY and are reconnecting _this_
+ * connection, bump connect_seq now. Always bump
+ * global_seq.
+ */
+ if (test_and_clear_bit(STANDBY, &con->state))
+ con->connect_seq++;
+ con->global_seq = get_global_seq(msgr, 0);
+
+ prepare_write_connect(msgr, con);
+ prepare_read_connect(con);
+ set_bit(CONNECTING, &con->state);
+
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ dout(5, "try_write initiating connect on %p new state %lu\n",
+ con, con->state);
+ con->sock = ceph_tcp_connect(con);
+ if (IS_ERR(con->sock)) {
+ con->sock = NULL;
+ con->error_msg = "connect error";
+ ret = -1;
+ goto out;
+ }
}
- spin_lock(&msgr->con_lock);
- existing = __get_connection(msgr, &con->peer_addr);
- if (existing) {
- if (peer_gseq < existing->global_seq) {
- /* retry_global */
- con->global_seq = existing->global_seq;
- con->out_connect.global_seq =
- cpu_to_le32(con->global_seq);
- prepare_write_accept_retry(con,
- &tag_retry_global,
- &con->out_connect.global_seq);
- } else if (test_bit(LOSSYTX, &existing->state)) {
- dout(20, "process_accept replacing existing LOSSYTX %p\n",
- existing);
- reset_connection(existing);
- __replace_connection(msgr, existing, con);
- } else if (peer_cseq < existing->connect_seq) {
- if (peer_cseq == 0) {
- /* reset existing connection */
- reset_connection(existing);
- /* replace connection */
- __replace_connection(msgr, existing, con);
- con->msgr->peer_reset(con->msgr->parent,
- &con->peer_addr,
- &con->peer_name);
- } else {
- /* old attempt or peer didn't get the READY */
- /* send retry with peers connect seq */
- con->connect_seq = existing->connect_seq;
- con->out_connect.connect_seq =
- cpu_to_le32(con->connect_seq);
- prepare_write_accept_retry(con,
- &tag_retry_session,
- &con->out_connect.connect_seq);
- }
- } else if (peer_cseq == existing->connect_seq &&
- (test_bit(CONNECTING, &existing->state) ||
- test_bit(STANDBY, &existing->state) ||
- test_bit(WAIT, &existing->state))) {
- /* connection race */
- dout(20, "process_accept connection race state = %lu\n",
- con->state);
- if (ceph_entity_addr_equal(&msgr->inst.addr,
- &con->peer_addr)) {
- /* incoming connection wins.. */
- /* replace existing with new connection */
- __replace_connection(msgr, existing, con);
- } else {
- /* our existing outgoing connection wins..
- tell peer to wait for our outgoing
- connection to go through */
- prepare_write_accept_reply(con, &tag_wait);
- }
- } else if (existing->connect_seq == 0 &&
- peer_cseq > existing->connect_seq) {
- /* we reset and already reconnecting */
- prepare_write_accept_reply(con, &tag_reset);
- } else {
- /* reconnect case, replace connection */
- __replace_connection(msgr, existing, con);
+
+more_kvec:
+ /* kvec data queued? */
+ if (con->out_kvec_left) {
+ ret = write_partial_kvec(con);
+ if (ret <= 0)
+ goto done;
+ if (ret < 0) {
+ dout(30, "try_write write_partial_kvec err %d\n", ret);
+ goto done;
}
- put_connection(existing);
- } else if (peer_cseq > 0) {
- dout(20, "process_accept no existing connection, we reset\n");
- prepare_write_accept_reply(con, &tag_reset);
- } else {
- dout(20, "process_accept no existing connection, opening\n");
- __register_connection(msgr, con);
- con->global_seq = peer_gseq;
- con->connect_seq = peer_cseq + 1;
- prepare_write_accept_ready(con);
}
- spin_unlock(&msgr->con_lock);
- radix_tree_preload_end();
- ceph_queue_con(con);
- put_connection(con);
- return 0;
+ /* msg pages? */
+ if (con->out_msg) {
+ ret = write_partial_msg_pages(con);
+ if (ret == 1)
+ goto more_kvec; /* we need to send the footer, too! */
+ if (ret == 0)
+ goto done;
+ if (ret < 0) {
+ dout(30, "try_write write_partial_msg_pages err %d\n",
+ ret);
+ goto done;
+ }
+ }
+
+ /* is anything else pending? */
+ spin_lock(&con->out_queue_lock);
+ if (!list_empty(&con->out_queue)) {
+ prepare_write_message(con);
+ spin_unlock(&con->out_queue_lock);
+ goto more;
+ }
+ if (con->in_seq > con->in_seq_acked) {
+ prepare_write_ack(con);
+ spin_unlock(&con->out_queue_lock);
+ goto more;
+ }
+
+ /* Nothing to do! */
+ clear_bit(WRITE_PENDING, &con->state);
+ dout(30, "try_write nothing else to write.\n");
+ spin_unlock(&con->out_queue_lock);
+done:
+ ret = 0;
+out:
+ dout(30, "try_write done on %p\n", con);
+ return ret;
}
+
/*
- * worker function when data is available on the socket
+ * Read what we can from the socket.
*/
static int try_read(struct ceph_connection *con)
{
more:
if (test_bit(ACCEPTING, &con->state)) {
dout(20, "try_read accepting\n");
- ret = read_accept_partial(con);
+ ret = read_partial_accept(con);
if (ret <= 0)
goto done;
if (process_accept(con) < 0) {
}
if (test_bit(CONNECTING, &con->state)) {
dout(20, "try_read connecting\n");
- ret = read_connect_partial(con);
+ ret = read_partial_connect(con);
if (ret <= 0)
goto done;
if (process_connect(con) < 0) {
}
if (con->in_base_pos < 0) {
- /* skipping + discarding content */
+ /*
+ * skipping + discarding content.
+ *
+ * FIXME: there must be a better way to do this!
+ */
static char buf[1024];
int skip = min(1024, -con->in_base_pos);
dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos);
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
+ /*
+ * what's next?
+ */
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
if (ret <= 0)
goto done;
}
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
- ret = read_message_partial(con);
+ ret = read_partial_message(con);
if (ret == -EIO) {
con->error_msg = "bad crc";
goto out;
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_ACK) {
- ret = read_ack_partial(con);
+ ret = read_partial_ack(con);
if (ret <= 0)
goto done;
process_ack(con);
}
+/*
+ * Atomically queue work on a connection. Bump @con reference to
+ * avoid races with connection teardown.
+ *
+ * There is some trickery going on with QUEUED, BUSY, and BACKOFF:
+ *
+ *
+ */
+static void ceph_queue_con(struct ceph_connection *con)
+{
+ if (test_bit(WAIT, &con->state) ||
+ test_bit(CLOSED, &con->state) ||
+ test_bit(BACKOFF, &con->state)) {
+ dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n",
+ con);
+ return;
+ }
+
+ atomic_inc(&con->nref);
+ dout(40, "ceph_queue_con %p %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+
+ set_bit(QUEUED, &con->state);
+ if (test_bit(BUSY, &con->state) ||
+ !queue_work(ceph_msgr_wq, &con->work.work)) {
+ dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
+ put_connection(con);
+ }
+}
+
+/*
+ * Do some work on a connection. Drop a connection ref when we're done.
+ */
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
}
+/*
+ * failure case
+ * A retry mechanism is used with exponential backoff
+ */
+static void ceph_fault(struct ceph_connection *con)
+{
+ derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name),
+ IPQUADPORT(con->peer_addr.ipaddr), con->error_msg);
+ dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
+ con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
+
+ if (test_bit(LOSSYTX, &con->state)) {
+ dout(30, "fault on LOSSYTX channel\n");
+ remove_connection(con->msgr, con);
+ return;
+ }
+
+ con_close_socket(con);
+
+ /* hmm? */
+ BUG_ON(test_bit(WAIT, &con->state));
+
+ /*
+ * If there are no messages in the queue, place the
+ * connection in a STANDBY state. otherwise, retry with
+ * delay
+ */
+ spin_lock(&con->out_queue_lock);
+ if (list_empty(&con->out_queue)) {
+ dout(10, "fault setting STANDBY\n");
+ set_bit(STANDBY, &con->state);
+ spin_unlock(&con->out_queue_lock);
+ return;
+ }
+
+ dout(10, "fault setting BACKOFF\n");
+ set_bit(BACKOFF, &con->state);
+
+ if (con->delay == 0)
+ con->delay = BASE_DELAY_INTERVAL;
+ else if (con->delay < MAX_DELAY_INTERVAL)
+ con->delay *= 2;
+
+ atomic_inc(&con->nref);
+ dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref),
+ con->delay);
+ queue_delayed_work(ceph_msgr_wq, &con->work,
+ round_jiffies_relative(con->delay));
+
+ list_splice_init(&con->out_sent, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+}
+
/*
* worker function when listener receives a connect