From 207880e9bec312f4ffc9f8b53b3467c2b518d214 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 25 Aug 2009 16:49:28 -0700 Subject: [PATCH] kclient: factor out connection creation; add con_send, con_keepalive Mmm, much cleaner so far! --- src/TODO | 9 ++++- src/include/msgr.h | 1 + src/kernel/messenger.c | 80 +++++++++++++++++++++++++++++++++++------- src/kernel/messenger.h | 13 +++++-- 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/src/TODO b/src/TODO index e360d960be543..e9f70cc531454 100644 --- a/src/TODO +++ b/src/TODO @@ -26,9 +26,16 @@ v0.14 - radosgw - uclient: fix write vs max_size? -- msgr: unidirectional option - mds: put migration vectors in mdsmap +- msgr: unidirectional option + - what about mon -> mds/osd messages? +- kclient msgr fixups + - private field in ceph_connection + - kill radix_tree; allocate connections in callers (osdc, monc, mdsc) + - simplify ceph_ping (should be a single byte) + + bugs - premature filejournal trimming? - weird osd_lock contention during osd restart? diff --git a/src/include/msgr.h b/src/include/msgr.h index 52d2555743fab..50859cf4f47fd 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -92,6 +92,7 @@ struct ceph_entity_inst { #define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */ #define CEPH_MSGR_TAG_MSG 10 /* message */ #define CEPH_MSGR_TAG_ACK 11 /* message ack */ +#define CEPH_MSGR_TAG_KEEPALIVE 12 /* just a keepalive byte! */ /* diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 8d618f23ba941..e95ebd664b911 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -35,6 +35,7 @@ /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; +static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; static void ceph_queue_con(struct ceph_connection *con); @@ -216,15 +217,11 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, /* - * create a new connection. + * initialize a new connection */ -static struct ceph_connection *new_connection(struct ceph_messenger *msgr) +void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con, + struct ceph_entity_addr *addr) { - struct ceph_connection *con; - - con = kzalloc(sizeof(*con), GFP_NOFS); - if (con == NULL) - return NULL; con->msgr = msgr; atomic_set(&con->nref, 1); INIT_LIST_HEAD(&con->list_all); @@ -233,9 +230,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); - - dout("new connection: %p\n", con); - return con; + con->peer_addr = *addr; } /* @@ -553,6 +548,20 @@ static void prepare_write_ack(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } +/* + * Prepare to write keepalive byte. + */ +static void prepare_write_keepalive(struct ceph_connection *con) +{ + dout("prepare_write_keepalive %p\n", con); + con->out_kvec[0].iov_base = &tag_keepalive; + con->out_kvec[0].iov_len = 1; + con->out_kvec_left = 1; + con->out_kvec_bytes = 1; + con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); +} + /* * Connection negotiation. */ @@ -1350,6 +1359,11 @@ more_kvec: spin_unlock(&con->out_queue_lock); } + if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { + prepare_write_keepalive(con); + goto more_kvec; + } + /* Nothing to do! */ clear_bit(WRITE_PENDING, &con->state); dout("try_write nothing else to write.\n"); @@ -1781,9 +1795,10 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, /* drop lock while we allocate a new connection */ spin_unlock(&msgr->con_lock); - newcon = new_connection(msgr); - if (IS_ERR(newcon)) - return PTR_ERR(con); + newcon = kzalloc(sizeof(*con), GFP_NOFS); + if (!newcon) + return -ENOMEM; + ceph_con_init(msgr, newcon, &msg->hdr.dst.addr); newcon->out_connect.flags = 0; if (!timeout) { @@ -1860,6 +1875,45 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, return ret; } +/* + * Queue up an outgoing message on the given connection. + */ +void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) +{ + /* set source */ + msg->hdr.src = con->msgr->inst; + msg->hdr.orig_src = con->msgr->inst; + + /* queue */ + spin_lock(&con->out_queue_lock); + msg->hdr.seq = cpu_to_le64(++con->out_seq); + dout("----- %p %u to %s%d %d=%s len %d+%d+%d -----\n", msg, + (unsigned)con->out_seq, + ENTITY_NAME(msg->hdr.dst.name), le16_to_cpu(msg->hdr.type), + ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), + le32_to_cpu(msg->hdr.front_len), + le32_to_cpu(msg->hdr.middle_len), + le32_to_cpu(msg->hdr.data_len)); + dout("ceph_con_send %p %p seq %llu for %s%d on %p pgs %d\n", + con, msg, le64_to_cpu(msg->hdr.seq), + ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages); + list_add_tail(&msg->list_head, &con->out_queue); + spin_unlock(&con->out_queue_lock); + + /* if there wasn't anything waiting to send before, queue + * new work */ + if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) + ceph_queue_con(con); +} + +void ceph_con_keepalive(struct ceph_connection *con) +{ + if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && + test_and_set_bit(WRITE_PENDING, &con->state) == 0) + ceph_queue_con(con); +} + + /* * construct a new message with given type, size * the new msg has a ref count of 1. diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 36e935d314681..366f415fae01c 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -54,8 +54,6 @@ static inline const char *ceph_name_type_str(int t) } } -#define CEPH_MSGR_BACKUP 10 /* backlogged incoming connections */ - /* use format string %s%d */ #define ENTITY_NAME(n) \ ceph_name_type_str(le32_to_cpu((n).type)), \ @@ -127,6 +125,7 @@ struct ceph_msg_pos { #define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define LOSSYRX 1 /* peer may reset/drop messages */ #define CONNECTING 2 +#define KEEPALIVE_PENDING 3 #define WRITE_PENDING 4 /* we have data ready to send */ #define QUEUED 5 /* there is work queued on this connection */ #define BUSY 6 /* work is being done */ @@ -147,6 +146,8 @@ struct ceph_msg_pos { * messages in the case of a TCP disconnect. */ struct ceph_connection { + void *private; + struct ceph_messenger *msgr; struct socket *sock; unsigned long state; /* connection state (see flags above) */ @@ -168,6 +169,7 @@ struct ceph_connection { struct list_head out_queue; struct list_head out_sent; /* sending/sent but unacked */ u32 out_seq; /* last message queued for send */ + bool out_keepalive_pending; u32 in_seq, in_seq_acked; /* last message received, acked */ @@ -217,6 +219,13 @@ extern void ceph_msgr_exit(void); extern struct ceph_messenger * ceph_messenger_create(struct ceph_entity_addr *myaddr); extern void ceph_messenger_destroy(struct ceph_messenger *); + +extern void ceph_con_init(struct ceph_messenger *msgr, + struct ceph_connection *con, + struct ceph_entity_addr *addr); +extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); + + extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr); -- 2.39.5