atomic_read(&con->nref), atomic_read(&con->nref) + 1);
atomic_inc(&con->nref);
- /* if were just ACCEPTING this connection, it is already on the
+ /* if were just ACCEPTING this connection, it is already on the
* con_all and con_accepting lists. */
if (test_and_clear_bit(ACCEPTING, &con->state)) {
list_del_init(&con->list_bucket);
con->out_msg->footer.data_crc = 0;
/* is there a data payload? */
- if (le32_to_cpu(m->hdr.data_len) == 0) {
+ if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
con->out_msg_pos.page_pos =
con->out_more = 1; /* data + footer will follow */
} else {
/* no, queue up footer too and be done */
- prepare_write_message_footer(con, v);
+ prepare_write_message_footer(con, v);
}
set_bit(WRITE_PENDING, &con->state);
con->out_msg_pos.page_pos, len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
-
+
if (crc && msg->pages)
kunmap(page);
con->out_msg->footer.flags |=
cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
con->out_kvec_bytes = 0;
- con->out_kvec_left = 0;
+ con->out_kvec_left = 0;
con->out_kvec_cur = con->out_kvec;
prepare_write_message_footer(con, 0);
ret = 1;
* Atomically queue work on a connection. Bump @con reference to
* avoid races with connection teardown.
*
- * There is some trickery going on with QUEUED, BUSY, and BACKOFF:
+ * There is some trickery going on with QUEUED, BUSY, and BACKOFF because
+ * we only want a _single_ thread operating on each connection at any point
+ * in time, but we want to use all available CPUs.
+ *
+ * The worker thread only proceeds if it can atomically set BUSY. It
+ * clears QUEUED and does it's thing. When it thinks it's done, it
+ * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
+ * (tries again to set BUSY).
*
- *
+ * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
+ * try to queue work. If that fails (work is already queued, or BUSY)
+ * we give up (work also already being done or is queued) but leave QUEUED
+ * set so that the worker thread will loop if necessary.
+ *
+ * BACKOFF is set by the fault error path to prevent the worker thread from
+ * looping or socket events from requeuing work; instead, we schedule delayed
+ * work explicitly.
*/
static void ceph_queue_con(struct ceph_connection *con)
{
dout(5, "con_work CLOSED\n");
goto done;
}
- if (test_bit(WAIT, &con->state)) {
+ if (test_bit(WAIT, &con->state)) { /* we are a zombie */
dout(5, "con_work WAIT\n");
goto done;
}
if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
try_read(con) < 0 ||
try_write(con) < 0)
- ceph_fault(con);
+ ceph_fault(con); /* error/fault path */
done:
clear_bit(BUSY, &con->state);
/*
- * failure case
- * A retry mechanism is used with exponential backoff
+ * Generic error/fault handler. A retry mechanism is used with
+ * exponential backoff
*/
static void ceph_fault(struct ceph_connection *con)
{
con_close_socket(con);
- /* hmm? */
- BUG_ON(test_bit(WAIT, &con->state));
-
- /*
- * If there are no messages in the queue, place the
- * connection in a STANDBY state. otherwise, retry with
- * delay
- */
+ /* If there are no messages in the queue, place the connection
+ * in a STANDBY state (i.e., don't try to reconnect just yet). */
spin_lock(&con->out_queue_lock);
if (list_empty(&con->out_queue)) {
dout(10, "fault setting STANDBY\n");
return;
}
+ /* Requeue anything that hasn't been acked, and retry after a
+ * delay. */
+ list_splice_init(&con->out_sent, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+
+ /* set BACKOFF so that caller does not loop and to prevent
+ * anything from requeuing work on this connection. */
dout(10, "fault setting BACKOFF\n");
set_bit(BACKOFF, &con->state);
+ clear_bit(BUSY, &con->state); /* to avoid an improbable race */
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
- atomic_inc(&con->nref);
+ /* explicitly schedule work to try to reconnect again later. */
dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref),
+ atomic_read(&con->nref), atomic_read(&con->nref) + 1,
con->delay);
+ atomic_inc(&con->nref);
queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay));
-
- list_splice_init(&con->out_sent, &con->out_queue);
- spin_unlock(&con->out_queue_lock);
}
/*
- * worker function when listener receives a connect
+ * Handle an incoming connection.
*/
-static void try_accept(struct work_struct *work)
+static void accept_work(struct work_struct *work)
{
struct ceph_connection *newcon = NULL;
- struct ceph_messenger *msgr;
-
- msgr = container_of(work, struct ceph_messenger, awork);
-
- dout(5, "Entered try_accept\n");
+ struct ceph_messenger *msgr = container_of(work, struct ceph_messenger,
+ awork);
/* initialize the msgr connection */
newcon = new_connection(msgr);
if (newcon == NULL) {
derr(1, "kmalloc failure accepting new connection\n");
- goto done;
+ return;
}
- newcon->connect_seq = 1;
set_bit(ACCEPTING, &newcon->state);
+ newcon->connect_seq = 1;
newcon->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */
if (ceph_tcp_accept(msgr->listen_sock, newcon) < 0) {
derr(1, "error accepting connection\n");
put_connection(newcon);
- goto done;
+ return;
}
dout(5, "accepted connection \n");
prepare_write_accept_hello(msgr, newcon);
add_connection_accepting(msgr, newcon);
- /* hand off to work queue; we may have missed socket state change */
+ /* queue work explicitly; we may have missed the socket state
+ * change before setting the socket callbacks. */
ceph_queue_con(newcon);
-done:
- return;
}
+
+
/*
* create a new messenger instance, creates listening socket
*/
msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
if (msgr == NULL)
return ERR_PTR(-ENOMEM);
- INIT_WORK(&msgr->awork, try_accept);
+
+ INIT_WORK(&msgr->awork, accept_work);
spin_lock_init(&msgr->con_lock);
INIT_LIST_HEAD(&msgr->con_all);
INIT_LIST_HEAD(&msgr->con_accepting);
INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
spin_lock_init(&msgr->global_seq_lock);
+ /* the zero page is needed if a request is "canceled" while the message
+ * is being written over the socket */
msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
if (!msgr->zero_page) {
kfree(msgr);
dout(1, "messenger %p listening on %u.%u.%u.%u:%u\n", msgr,
IPQUADPORT(msgr->inst.addr.ipaddr));
-
return msgr;
}
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
__remove_connection(msgr, con);
- /* in case there's queued work... */
+ /* in case there's queued work. drop a reference if
+ * we successfully cancel work. */
spin_unlock(&msgr->con_lock);
if (cancel_delayed_work_sync(&con->work))
put_connection(con);
__free_page(msgr->zero_page);
kfree(msgr);
-
dout(10, "destroyed messenger %p\n", msgr);
}
/*
- * mark a peer down. drop any open connection.
+ * mark a peer down. drop any open connections.
*/
void ceph_messenger_mark_down(struct ceph_messenger *msgr,
struct ceph_entity_addr *addr)
/*
- * a single ceph_msg can't be queued for send twice, unless it's
- * already been delivered (i.e. we have the only remaining reference).
- * so, dup the message if there is more than once reference.
+ * A single ceph_msg can't be queued for send twice, unless it's
+ * already been delivered (i.e. we have the only remaining reference),
+ * because of the list_head indicating which queue it is on.
+ *
+ * So, we dup the message if there is more than once reference. If it has
+ * pages (a data payload), steal the pages away from the old message.
*/
struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
{
le32_to_cpu(old->hdr.data_len),
le32_to_cpu(old->hdr.data_off),
old->pages);
- BUG_ON(!dup);
+ if (!dup)
+ return ERR_PTR(-ENOMEM);
memcpy(dup->front.iov_base, old->front.iov_base,
le32_to_cpu(old->hdr.front_len));
/*
- * queue up an outgoing message.
+ * Queue up an outgoing message.
*
- * this consumes a msg reference. that is, if the caller wants to
+ * This consumes a msg reference. That is, if the caller wants to
* keep @msg around, it had better call ceph_msg_get first.
*/
int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
if (!con) {
/* drop lock while we allocate a new connection */
spin_unlock(&msgr->con_lock);
+
newcon = new_connection(msgr);
if (IS_ERR(newcon))
return PTR_ERR(con);
newcon->out_connect.flags = 0;
- if (!timeout)
+ if (!timeout) {
+ dout(10, "ceph_msg_send setting LOSSYTX\n");
newcon->out_connect.flags |= CEPH_MSG_CONNECT_LOSSYTX;
+ set_bit(LOSSYTX, &newcon->state);
+ }
ret = radix_tree_preload(GFP_NOFS);
if (ret < 0) {
spin_unlock(&msgr->con_lock);
}
- if (!timeout) {
- dout(10, "ceph_msg_send setting LOSSYTX\n");
- set_bit(LOSSYTX, &con->state);
- }
-
/* queue */
spin_lock(&con->out_queue_lock);
+
+ /* avoid queuing multiple PING messages in a row. */
if (unlikely(le16_to_cpu(msg->hdr.type) == CEPH_MSG_PING &&
!list_empty(&con->out_queue) &&
- le16_to_cpu(list_entry(con->out_queue.prev, struct ceph_msg,
- list_head)->hdr.type) == CEPH_MSG_PING)) {
- /* don't queue multiple pings in a row */
+ le16_to_cpu(list_entry(con->out_queue.prev,
+ struct ceph_msg,
+ list_head)->hdr.type) == CEPH_MSG_PING)) {
dout(2, "ceph_msg_send dropping dup ping\n");
ceph_msg_put(msg);
} else {
}
spin_unlock(&con->out_queue_lock);
+ /* if there wasn't anything waiting to send before, queue
+ * new work */
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
ceph_queue_con(con);
m->nr_pages = calc_pages_for(page_off, page_len);
m->pages = pages;
- dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, m->nr_pages);
+ dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
+ m->nr_pages);
return m;
out2:
dout(20, "ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
atomic_read(&m->nref)-1);
if (atomic_read(&m->nref) <= 0) {
- derr(0, "bad ceph_msg_put on %p %llu from %s%d %d=%s len %d+%d\n",
+ derr(0, "bad ceph_msg_put on %p %llu %s%d->%s%d %d=%s %d+%d\n",
m, le64_to_cpu(m->hdr.seq),
ENTITY_NAME(m->hdr.src.name),
+ ENTITY_NAME(m->hdr.dst.name),
le16_to_cpu(m->hdr.type),
ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
le32_to_cpu(m->hdr.front_len),