return rc;
}
+/*
+ * Reset a connection. Discard all incoming and outgoing messages
+ * and clear *_seq state.
+ */
+static void reset_connection(struct ceph_connection *con)
+{
+ /* reset connection, out_queue, msg_ and connect_seq */
+ /* discard existing out_queue and msg_seq */
+ spin_lock(&con->out_queue_lock);
+ ceph_msg_put_list(&con->out_queue);
+ ceph_msg_put_list(&con->out_sent);
+
+ con->connect_seq = 0;
+ con->out_seq = 0;
+ con->out_msg = NULL;
+ con->in_seq = 0;
+ spin_unlock(&con->out_queue_lock);
+}
+
/*
* mark a peer down. drop any open connections.
*/
dout("close %p peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(con->peer_addr.ipaddr));
set_bit(CLOSED, &con->state); /* in case there's queued work */
+ reset_connection(con);
queue_con(con);
}
void ceph_con_destroy(struct ceph_connection *con)
{
dout("con_destroy %p destroying\n", con);
- ceph_msg_put_list(&con->out_queue);
- ceph_msg_put_list(&con->out_sent);
- set_bit(CLOSED, &con->state);
+ reset_connection(con);
+ set_bit(DEAD, &con->state);
con_close_socket(con); /* silently ignore errors */
}
+/*
+ * Reopen a closed connection, with a new peer address.
+ */
+void ceph_con_reopen(struct ceph_connection *con, struct ceph_entity_addr *addr)
+{
+ dout("con_reopen %p %u.%u.%u.%u:%u\n", con, IPQUADPORT(addr->ipaddr));
+ BUG_ON(!test_bit(CLOSED, &con->state));
+ set_bit(REOPEN, &con->state);
+ clear_bit(CLOSED, &con->state);
+ memcpy(&con->peer_addr, addr, sizeof(*addr));
+ queue_con(con);
+}
+
/*
* generic get/put
*/
return 0;
}
-/*
- * Reset a connection. Discard all incoming and outgoing messages
- * and clear *_seq state.
- */
-static void reset_connection(struct ceph_connection *con)
-{
- pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n",
- ENTITY_NAME(con->peer_name),
- IPQUADPORT(con->peer_addr.ipaddr));
-
- /* reset connection, out_queue, msg_ and connect_seq */
- /* discard existing out_queue and msg_seq */
- spin_lock(&con->out_queue_lock);
- ceph_msg_put_list(&con->out_queue);
- ceph_msg_put_list(&con->out_sent);
-
- con->connect_seq = 0;
- con->out_seq = 0;
- con->out_msg = NULL;
- con->in_seq = 0;
- con->in_msg = NULL;
- spin_unlock(&con->out_queue_lock);
-}
-
-
static int process_connect(struct ceph_connection *con)
{
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
*/
dout("process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_connect.connect_seq));
+ pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n",
+ ENTITY_NAME(con->peer_name),
+ IPQUADPORT(con->peer_addr.ipaddr));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
*/
static void process_message(struct ceph_connection *con)
{
+ struct ceph_msg *msg = con->in_msg;
+
+ con->in_msg = NULL;
+
/* if first message, set peer_name */
if (con->peer_name.type == 0)
- con->peer_name = con->in_msg->hdr.src.name;
+ con->peer_name = msg->hdr.src.name;
spin_lock(&con->out_queue_lock);
con->in_seq++;
spin_unlock(&con->out_queue_lock);
dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
- con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
- ENTITY_NAME(con->in_msg->hdr.src.name),
- le16_to_cpu(con->in_msg->hdr.type),
- ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
- le32_to_cpu(con->in_msg->hdr.front_len),
- le32_to_cpu(con->in_msg->hdr.data_len),
+ msg, le64_to_cpu(msg->hdr.seq),
+ ENTITY_NAME(msg->hdr.src.name),
+ le16_to_cpu(msg->hdr.type),
+ ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
+ le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.data_len),
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
- con->ops->dispatch(con, con->in_msg);
- con->in_msg = NULL;
+ con->ops->dispatch(con, msg);
prepare_read_tag(con);
}
ret = read_partial_connect(con);
if (ret <= 0)
goto done;
- if (process_connect(con) < 0) {
+ if (process_connect(con) < 0 || test_bit(WAIT, &con->state)) {
ret = -1;
goto out;
}
static void queue_con(struct ceph_connection *con)
{
if (test_bit(WAIT, &con->state) ||
- test_bit(CLOSED, &con->state)) {
- dout("queue_con %p ignoring: WAIT|CLOSED\n",
+ test_bit(DEAD, &con->state)) {
+ dout("queue_con %p ignoring: WAIT|DEAD\n",
con);
return;
}
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout("con_work CLOSED\n");
- ceph_con_destroy(con);
- if (test_and_clear_bit(REOPEN, &con->state)) {
- /* reopen w/ new peer */
- dout("con_work REPOPEN\n");
- clear_bit(CLOSED, &con->state);
- goto more;
- }
+ con_close_socket(con);
goto done;
}
+ if (test_and_clear_bit(REOPEN, &con->state)) {
+ /* reopen w/ new peer */
+ dout("con_work REOPEN\n");
+ con_close_socket(con);
+ }
if (test_bit(WAIT, &con->state)) { /* we are a zombie */
dout("con_work WAIT\n");
goto done;
dout("create ip not specified, initially INADDR_ANY\n");
msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY);
msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */
+ msgr->inst.addr.nonce = get_random_int();
}
msgr->inst.addr.ipaddr.sin_family = AF_INET;
else
kfree(m->front.iov_base);
kfree(m);
- dout("msg_kfree %p done\n", m);
}
/*