static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
-static void ceph_queue_con(struct ceph_connection *con);
+static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
static void ceph_fault(struct ceph_connection *con);
if (sk->sk_state != TCP_CLOSE_WAIT) {
dout("ceph_data_ready on %p state = %lu, queueing work\n",
con, con->state);
- ceph_queue_con(con);
+ queue_con(con);
}
}
/* only queue to workqueue if there is data we want to write. */
if (test_bit(WRITE_PENDING, &con->state)) {
dout("ceph_write_space %p queueing write work\n", con);
- ceph_queue_con(con);
+ queue_con(con);
} else {
dout("ceph_write_space %p nothing to write\n", con);
}
con->error_msg = "connection failed";
else
con->error_msg = "socket closed";
- ceph_queue_con(con);
+ queue_con(con);
break;
case TCP_ESTABLISHED:
dout("ceph_state_change TCP_ESTABLISHED\n");
- ceph_queue_con(con);
+ queue_con(con);
break;
}
}
dout("close %p peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(con->peer_addr.ipaddr));
set_bit(CLOSED, &con->state); /* in case there's queued work */
+ queue_con(con);
}
/*
* we give up (work also already being done or is queued) but leave QUEUED
* set so that the worker thread will loop if necessary.
*/
-static void ceph_queue_con(struct ceph_connection *con)
+static void queue_con(struct ceph_connection *con)
{
if (test_bit(WAIT, &con->state) ||
test_bit(CLOSED, &con->state)) {
- dout("ceph_queue_con %p ignoring: WAIT|CLOSED\n",
+ dout("queue_con %p ignoring: WAIT|CLOSED\n",
con);
return;
}
if (!con->ops->get(con)) {
- dout("ceph_queue_con %p ref count 0\n", con);
+ dout("queue_con %p ref count 0\n", con);
return;
}
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
!queue_work(ceph_msgr_wq, &con->work.work)) {
- dout("ceph_queue_con %p - already BUSY or queued\n", con);
+ dout("queue_con %p - already BUSY or queued\n", con);
con->ops->put(con);
} else {
- dout("ceph_queue_con %p\n", con);
+ dout("queue_con %p\n", con);
}
}
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout("con_work CLOSED\n");
+ ceph_con_destroy(con);
+ if (test_and_clear_bit(REOPEN, &con->state)) {
+ /* reopen w/ new peer */
+ dout("con_work REPOPEN\n");
+ clear_bit(CLOSED, &con->state);
+ goto more;
+ }
goto done;
}
if (test_bit(WAIT, &con->state)) { /* we are a zombie */
*/
void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
{
- /* set source */
+ if (test_bit(CLOSED, &con->state)) {
+ dout("con_send %p closed, dropping %p\n", con, msg);
+ ceph_msg_put(msg);
+ return;
+ }
+
+ /* set src+dst */
msg->hdr.src = con->msgr->inst;
msg->hdr.orig_src = con->msgr->inst;
msg->hdr.dst.addr = con->peer_addr;
/* if there wasn't anything waiting to send before, queue
* new work */
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
- ceph_queue_con(con);
+ queue_con(con);
}
/*
{
if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
test_and_set_bit(WRITE_PENDING, &con->state) == 0)
- ceph_queue_con(con);
+ queue_con(con);
}