static void try_read(struct work_struct *);
static void try_write(struct work_struct *);
-static void try_write_later(struct work_struct *);
static void try_accept(struct work_struct *);
INIT_LIST_HEAD(&con->out_sent);
INIT_WORK(&con->rwork, try_read);
- INIT_WORK(&con->swork, try_write);
- INIT_DELAYED_WORK(&con->delaywork, try_write_later);
+ INIT_DELAYED_WORK(&con->swork, try_write);
return con;
}
dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
if (atomic_dec_and_test(&con->nref)) {
dout(20, "put_connection destroying %p\n", con);
- sock_release(con->sock);
+ if (con->sock)
+ sock_release(con->sock);
kfree(con);
con = NULL;
}
*/
void ceph_queue_write(struct ceph_connection *con)
{
- if (test_and_set_bit(WRITEABLE, &con->state) != 0) {
- dout(40, "ceph_queue_write %p - already WRITEABLE\n", con);
- return;
- }
- if (test_bit(WRITING, &con->state)) {
- dout(40, "ceph_queue_write %p - still WRITING, queuing nothing\n", con);
- return;
- }
dout(40, "ceph_queue_write %p\n", con);
atomic_inc(&con->nref);
- queue_work(send_wq, &con->swork);
+ if (!queue_work(send_wq, &con->swork.work)) {
+ dout(40, "ceph_queue_write %p - already queued\n", con);
+ put_connection(con);
+ }
}
void ceph_queue_delayed_write(struct ceph_connection *con)
{
dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay);
atomic_inc(&con->nref);
- set_bit(WAITING, &con->state);
- if (!queue_delayed_work(send_wq, &con->delaywork, con->delay)) {
+ if (!queue_delayed_work(send_wq, &con->swork, con->delay)) {
dout(40, "ceph_queue_delayed_write %p - already queued\n", con);
put_connection(con);
}
* failure case
* A retry mechanism is used with exponential backoff
*/
-static void ceph_send_fault(struct ceph_connection *con, int error)
+static void ceph_send_fault(struct ceph_connection *con)
{
- derr(1, "ceph_send_fault %p state %lu error %d to peer %x:%d\n",
- con, con->state, error,
+ derr(1, "ceph_send_fault %p state %lu to peer %x:%d\n",
+ con, con->state,
ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
ntohs(con->peer_addr.ipaddr.sin_port));
- if (!con->delay) {
- /*
- * lossy channel. don't bother with any retry.
- */
- if (error == -EAGAIN) {
- derr(1, "ceph_send_fault lossy channel, ignoring EAGAIN\n");
- return;
- }
- dout(1, "ceph_send_fault lossy channel, giving up\n");
- remove_connection(con->msgr, con);
- return;
+ if (!test_and_clear_bit(CONNECTING, &con->state)){
+ derr(1, "CONNECTING bit not set\n");
+ /* TBD: reset buffer correctly */
+ /* reset buffer */
+ spin_lock(&con->out_queue_lock);
+ list_splice_init(&con->out_sent, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+ clear_bit(OPEN, &con->state);
}
+ set_bit(NEW, &con->state);
+ clear_bit(CLOSED, &con->state);
+ sock_release(con->sock);
+ /* retry with delay */
+ ceph_queue_delayed_write(con);
- switch (error) {
- case -ETIMEDOUT:
- derr(10, "timed out to peer %x:%d\n",
- ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
- ntohs(con->peer_addr.ipaddr.sin_port));
- /* peer unreachable, TBD: need to research if should retry
- * a few times first, then close socket and try reconnecting */
- case -EHOSTDOWN:
- case -EHOSTUNREACH:
- case -ENETUNREACH:
- derr(10, "ceph_send_fault peer unreachable via route state = %lu\n", con->state);
- /* TBD: reset buffer correctly */
- /* spin_lock(&con->out_queue_lock);
- list_splice_init(&con->out_sent, &con->out_queue);
- spin_unlock(&con->out_queue_lock); */
- /* retry with delay */
- ceph_queue_delayed_write(con);
- break;
- case -EAGAIN:
- /* no space in socket buffer, ceph_write_space will
- * handle requeueing */
- if (test_bit(OPEN, &con->state))
- return;
- case -EPIPE:
- /* TBD: may do something different here, may not be allowed */
- case -ECONNREFUSED:
- case -ECONNRESET:
- /* never connected socket. SOCK_DONE flag not set */
- case -ENOTCONN:
- derr(10, "ceph_send_fault no connection state = %lu\n", con->state);
- /* TBD: setup timeout here */
- if (!test_and_clear_bit(CONNECTING, &con->state)){
- derr(1, "CONNECTING bit not set\n");
- /* TBD: reset buffer correctly */
- /* reset buffer */
- spin_lock(&con->out_queue_lock);
- list_splice_init(&con->out_sent,
- &con->out_queue);
- spin_unlock(&con->out_queue_lock);
- clear_bit(OPEN, &con->state);
- }
- set_bit(NEW, &con->state);
- clear_bit(CLOSED, &con->state);
- sock_release(con->sock);
- /* retry with delay */
- ceph_queue_delayed_write(con);
- break;
- case -EIO:
- derr(10, "ceph_send_fault EIO set\n");
- /* shutdown or soft timeout */
-
- default:
- /* if we ever hit here ... */
- derr(10, "ceph_send_fault unrecognized error %d\n",
- error);
- }
if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
else
/*
* worker function when socket is writeable
*/
-static void _try_write(struct ceph_connection *con)
+static void try_write(struct work_struct *work)
{
+ struct ceph_connection *con = container_of(work, struct ceph_connection, swork.work);
struct ceph_messenger *msgr = con->msgr;
int ret = 1;
dout(30, "try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref));
-retry:
- clear_bit(WAITING, &con->state);
- if (test_and_set_bit(WRITING, &con->state)) {
- dout(20, "try_write already writing\n");
- goto out;
- }
- clear_bit(WRITEABLE, &con->state);
-
/* Peer initiated close, other end is waiting for us to close */
if (test_and_clear_bit(CLOSING, &con->state)) {
dout(5, "try_write peer initiated close\n");
remove_connection(msgr, con);
goto done;
}
+
if (test_bit(CLOSED, &con->state)) {
dout(5, "try_write closed\n");
remove_connection(msgr, con);
goto done;
}
+
+ if (con->sock && con->sock->sk->sk_state == TCP_CLOSE) {
+ dout(5, "try_write TCP_CLOSE received\n");
+ if (!con->delay) {
+ dout(5, "try_write tcp_close delay = 0\n");
+ remove_connection(msgr, con);
+ goto done;
+ } else {
+ dout(5, "try_write tcp_close delay != 0\n");
+ ceph_send_fault(con);
+ /* PW hmm, keep getting tcp_close so need to drop through..
+ * I want to play around with it a little more.. */
+ /* goto done; */
+ }
+ }
more:
dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
ret = ceph_tcp_connect(con);
dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
if (ret < 0) {
- /* fault */
- derr(1, "try_write connect error\n");
- ceph_send_fault(con, ret);
+ derr(1, "try_write connect error, maybe we need to cleanup?\n");
goto done;
}
}
remove_connection(msgr, con);
}
if (ret < 0) {
- ceph_send_fault(con, ret);
+ /* TBD: seems to always close on error, so remove this once we're sure */
+ /* ceph_send_fault(con, ret); */
goto done; /* error */
}
}
if (ret == 0)
goto done;
if (ret < 0) {
- ceph_send_fault(con, ret);
+ /* TBD: seems to always close on error, so remove this once we're sure */
+ /* ceph_send_fault(con, ret); */
goto done;
}
}
/* hmm, nothing to do! No more writes pending? */
dout(30, "try_write nothing else to write.\n");
spin_unlock(&con->out_queue_lock);
+ /* TBD PW remove from wait queue if still there.. and put_connection */
+ if (con->delay > 1) con->delay = BASE_DELAY_INTERVAL;
goto done;
}
spin_unlock(&con->out_queue_lock);
goto more;
done:
- clear_bit(WRITING, &con->state);
- if (test_bit(WRITEABLE, &con->state) && !test_bit(WAITING, &con->state)) {
- dout(30, "try_write writeable flag set again, looping\n");
- goto retry;
- }
-
-out:
dout(30, "try_write done on %p\n", con);
put_connection(con);
return;
}
-static void try_write_later(struct work_struct *work)
-{
- struct ceph_connection *con;
- con = container_of(work, struct ceph_connection, delaywork.work);
- _try_write(con);
-}
-
-static void try_write(struct work_struct *work)
-{
- struct ceph_connection *con;
- con = container_of(work, struct ceph_connection, swork);
- _try_write(con);
-}
-
-
/*
* prepare to read a message
*/
dout(10, "mark_down dropping %p\n", con);
set_bit(CLOSED, &con->state); /* in case there is queued work */
__remove_connection(msgr, con);
+ put_connection(con);
}
spin_unlock(&msgr->con_lock);
}