/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
+static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
static void ceph_queue_con(struct ceph_connection *con);
/*
- * create a new connection.
+ * initialize a new connection
*/
-static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
+void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
+ struct ceph_entity_addr *addr)
{
- struct ceph_connection *con;
-
- con = kzalloc(sizeof(*con), GFP_NOFS);
- if (con == NULL)
- return NULL;
con->msgr = msgr;
atomic_set(&con->nref, 1);
INIT_LIST_HEAD(&con->list_all);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
-
- dout("new connection: %p\n", con);
- return con;
+ con->peer_addr = *addr;
}
/*
set_bit(WRITE_PENDING, &con->state);
}
+/*
+ * Prepare to write keepalive byte.
+ */
+static void prepare_write_keepalive(struct ceph_connection *con)
+{
+ dout("prepare_write_keepalive %p\n", con);
+ con->out_kvec[0].iov_base = &tag_keepalive;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec_left = 1;
+ con->out_kvec_bytes = 1;
+ con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
+}
+
/*
* Connection negotiation.
*/
spin_unlock(&con->out_queue_lock);
}
+ if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
+ prepare_write_keepalive(con);
+ goto more_kvec;
+ }
+
/* Nothing to do! */
clear_bit(WRITE_PENDING, &con->state);
dout("try_write nothing else to write.\n");
/* drop lock while we allocate a new connection */
spin_unlock(&msgr->con_lock);
- newcon = new_connection(msgr);
- if (IS_ERR(newcon))
- return PTR_ERR(con);
+ newcon = kzalloc(sizeof(*con), GFP_NOFS);
+ if (!newcon)
+ return -ENOMEM;
+ ceph_con_init(msgr, newcon, &msg->hdr.dst.addr);
newcon->out_connect.flags = 0;
if (!timeout) {
return ret;
}
+/*
+ * Queue up an outgoing message on the given connection.
+ */
+void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ /* set source */
+ msg->hdr.src = con->msgr->inst;
+ msg->hdr.orig_src = con->msgr->inst;
+
+ /* queue */
+ spin_lock(&con->out_queue_lock);
+ msg->hdr.seq = cpu_to_le64(++con->out_seq);
+ dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg,
+ (unsigned)con->out_seq,
+ ENTITY_NAME(msg->hdr.dst.name), le16_to_cpu(msg->hdr.type),
+ ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
+ le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.middle_len),
+ le32_to_cpu(msg->hdr.data_len));
+ dout("ceph_con_send %p %p seq %llu for %s%d on %p pgs %d\n",
+ con, msg, le64_to_cpu(msg->hdr.seq),
+ ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
+ list_add_tail(&msg->list_head, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+
+ /* if there wasn't anything waiting to send before, queue
+ * new work */
+ if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+ ceph_queue_con(con);
+}
+
+void ceph_con_keepalive(struct ceph_connection *con)
+{
+ if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
+ test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+ ceph_queue_con(con);
+}
+
+
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.
}
}
-#define CEPH_MSGR_BACKUP 10 /* backlogged incoming connections */
-
/* use format string %s%d */
#define ENTITY_NAME(n) \
ceph_name_type_str(le32_to_cpu((n).type)), \
#define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define LOSSYRX 1 /* peer may reset/drop messages */
#define CONNECTING 2
+#define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */
#define QUEUED 5 /* there is work queued on this connection */
#define BUSY 6 /* work is being done */
* messages in the case of a TCP disconnect.
*/
struct ceph_connection {
+ void *private;
+
struct ceph_messenger *msgr;
struct socket *sock;
unsigned long state; /* connection state (see flags above) */
struct list_head out_queue;
struct list_head out_sent; /* sending/sent but unacked */
u32 out_seq; /* last message queued for send */
+ bool out_keepalive_pending;
u32 in_seq, in_seq_acked; /* last message received, acked */
extern struct ceph_messenger *
ceph_messenger_create(struct ceph_entity_addr *myaddr);
extern void ceph_messenger_destroy(struct ceph_messenger *);
+
+extern void ceph_con_init(struct ceph_messenger *msgr,
+ struct ceph_connection *con,
+ struct ceph_entity_addr *addr);
+extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
+
+
extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
struct ceph_entity_addr *addr);