#include <net/tcp.h>
#include "messenger.h"
#include "ktcp.h"
+extern int ceph_msgr_debug = 50;
+#define DOUT_VAR ceph_msgr_debug
+#define DOUT_PREFIX "msgr: "
+#include "super.h"
/* static tag bytes */
static char tag_ready = CEPH_MSGR_TAG_READY;
static void try_write(struct work_struct *);
static void try_accept(struct work_struct *);
+/*
+ * failure case
+ * A retry mechanism is used with exponential backoff
+ */
+static void ceph_send_fault(struct ceph_connection *con, int error)
+{
+ derr(1, "connection error %d to peer %x:%d\n", error,
+ ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
+ ntohs(con->peer_addr.ipaddr.sin_port));
+
+ if (!con->delay) {
+ derr(1, "timeout not set\n");
+ return;
+ }
+
+ switch (error) {
+ /* no space in socket buffer, ceph_write_space will handle
+ * requeueing */
+ case -EAGAIN:
+ break;
+ case -ETIMEDOUT:
+ derr(1, "timed out to peer %x:%d\n",
+ ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
+ ntohs(con->peer_addr.ipaddr.sin_port));
+ /* peer unreachable */
+ case -EHOSTDOWN:
+ case -EHOSTUNREACH:
+ case -ENETUNREACH:
+ derr(1, "ENETUNREACH set\n");
+ spin_lock(&con->out_queue_lock);
+ list_splice_init(&con->out_sent, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+ /* retry with delay */
+ queue_delayed_work(send_wq, &con->swork,
+ BASE_RETRY_INTERVAL);
+ break;
+ case -EPIPE:
+ case -ECONNREFUSED:
+ case -ECONNRESET:
+ /* never connected socket. SOCK_DONE flag not set */
+ case -ENOTCONN:
+ derr(1, "ENOTCONN set\n");
+ /* TBD: setup timeout here */
+ if (!test_and_clear_bit(CONNECTING, &con->state)){
+ derr(1, "CONNECTING bit not set\n");
+ /* reset buffer */
+ spin_lock(&con->out_queue_lock);
+ list_splice_init(&con->out_sent,
+ &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+ clear_bit(OPEN, &con->state);
+ }
+ set_bit(NEW, &con->state);
+ /* retry with delay */
+ queue_delayed_work(send_wq, &con->swork,
+ BASE_RETRY_INTERVAL);
+ break;
+ case -EIO:
+ derr(1, "EIO set\n");
+ /* shutdown or soft timeout */
+
+ default:
+ /* if we ever hit here ... */
+ derr(1, "unrecognized error %d\n", error);
+ }
+}
/*
* calculate the number of pages a given length and offset map onto,
INIT_LIST_HEAD(&con->out_sent);
INIT_WORK(&con->rwork, try_read);
- INIT_WORK(&con->swork, try_write);
+ INIT_DELAYED_WORK(&con->swork, try_write);
return con;
}
struct ceph_messenger *msgr;
int ret = 1;
- con = container_of(work, struct ceph_connection, swork);
+ con = container_of(work, struct ceph_connection, swork.work);
msgr = con->msgr;
dout(30, "try_write start %p state %d\n", con, con->state);
-
-retry:
- if (test_and_set_bit(WRITING, &con->state) != 0) {
- dout(30, "try_write connection already writing\n");
- return;
- }
- clear_bit(WRITEABLE, &con->state);
-
more:
dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
dout(5, "try_write initiated connect\n");
if (ret < 0) {
/* fault */
- derr(1, "connect error, FIXME\n");
+ derr(1, "connect error\n");
+ ceph_send_fault(con, ret);
goto done;
}
}
- /*if (test_bit(CONNECTING, &con->state)) {
- dout(30, "try_write still connecting, doing nothing for now\n");
- goto done;
- }
- */
/* kvec data queued? */
if (con->out_kvec_left) {
put_connection(con);
}
if (ret < 0) {
- /* TBD: handle error; return for now */
- con->error = ret;
+ ceph_send_fault(con, ret);
goto done; /* error */
}
}
ret = write_partial_msg_pages(con, con->out_msg);
if (ret == 0)
goto done;
+ if (ret < 0) {
+ ceph_send_fault(con, ret);
+ goto done;
+ }
}
/* anything else pending? */
} else if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
} else {
+ clear_bit(WRITE_PENDING, &con->state);
/* hmm, nothing to do! No more writes pending? */
dout(30, "try_write nothing else to write.\n");
- clear_bit(WRITING, &con->state); /* clear this first */
- clear_bit(WRITE_PENDING, &con->state); /* and this second, to avoid a race. */
spin_unlock(&con->out_queue_lock);
- return;
+ goto done;
}
spin_unlock(&con->out_queue_lock);
goto more;
done:
dout(30, "try_write done\n");
- clear_bit(WRITING, &con->state);
-
- /*
- * See if we became WRITEABLE again to avoid race against socket.
- * Otherwise, this would be bad:
- * A B
- * - enter try_write, do some work
- * - socket fills, we get -EAGAIN or whatever
- * - socket becomes writeable again, work is queued
- * - new try_write sees WRITING bit, exits
- * - original try_write clears WRITING bit
- */
- if (test_bit(WRITEABLE, &con->state)) {
- dout(30, "try_write writeable flag got set again, looping just in case\n");
- goto retry;
- }
return;
}
* hand off to worker threads ,should be able to write, we want to
* try to write right away, we may have missed socket state change
*/
- queue_work(send_wq, &new_con->swork);
+ queue_work(send_wq, &new_con->swork.work);
done:
return;
}
*
* will take+drop msgr, then connection locks.
*/
-int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
+int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
+ unsigned long timeout)
{
struct ceph_connection *con;
int ret = 0;
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
}
-
+ con->delay = timeout;
/* queue */
spin_lock(&con->out_queue_lock);
msg->hdr.seq = ++con->out_seq;
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) {
dout(30, "ceph_msg_send queuing new swork on %p\n", con);
- queue_work(send_wq, &con->swork);
+ queue_work(send_wq, &con->swork.work);
dout(30, "ceph_msg_send queued\n");
}