From 431d03465a8818f363860b27686cfae19d39151b Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 2 Nov 2007 22:51:30 +0000 Subject: [PATCH] accepter bits; no connect, yet git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2017 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/include/ceph_fs.h | 10 +- trunk/ceph/kernel/kmsg.h | 34 ++--- trunk/ceph/kernel/messenger.c | 237 ++++++++++++++++++++++++++++++++-- 3 files changed, 250 insertions(+), 31 deletions(-) diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index b9d2d09e055ad..c01f11e3bcfa4 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -142,11 +142,11 @@ struct ceph_entity_name { #define CEPH_ENTITY_TYPE_CLIENT 4 #define CEPH_ENTITY_TYPE_ADMIN 5 -#define CEPH_MSGR_TAG_READY 0 // server -> client + oseq: ready for messages -#define CEPH_MSGR_TAG_REJECT 1 // server -> client + oseq: decline socket -#define CEPH_MSGR_TAG_MSG 2 // message -#define CEPH_MSGR_TAG_ACK 3 // message ack -#define CEPH_MSGR_TAG_CLOSE 4 // closing pipe +#define CEPH_MSGR_TAG_READY 1 // server -> client + cseq: ready for messages +#define CEPH_MSGR_TAG_REJECT 2 // server -> client + cseq: decline socket +#define CEPH_MSGR_TAG_MSG 3 // message +#define CEPH_MSGR_TAG_ACK 4 // message ack +#define CEPH_MSGR_TAG_CLOSE 5 // closing pipe /* diff --git a/trunk/ceph/kernel/kmsg.h b/trunk/ceph/kernel/kmsg.h index cff51cdba7eae..b1b0004cd7154 100644 --- a/trunk/ceph/kernel/kmsg.h +++ b/trunk/ceph/kernel/kmsg.h @@ -14,23 +14,27 @@ extern struct task_struct *athread; struct ceph_kmsgr { void *m_parent; struct task_struct *athread; - struct radix_tree_root mpipes; /* other nodes talk to */ + struct radix_tree_root connections; /* see get_connection() */ + struct list_head accepting; /* connections that aren't open yet */ }; struct ceph_message { - atomic_t nref; - int mflags; - struct ceph_message_header *msghdr; /* header */ - __u32 chunklens[2]; + struct ceph_message_header hdr; /* header */ + __u32 chunklen[2]; struct ceph_bufferlist payload; - struct list_head m_list_head; + + struct list_head list_head; + atomic_t nref; }; /* current state of connection, probably won't need all these.. */ enum ceph_con_state { + NEW, ACCEPTING, CONNECTING, OPEN, + REJECTING, + CLOSED, READ_PENDING, READING, @@ -52,31 +56,31 @@ enum ceph_con_state { struct ceph_connection { struct socket *sock; /* connection socket */ - /* TDB: may need a mutex here depending if */ - spinlock_t con_lock; + + atomic_t nref; + spinlock_t con_lock; /* TDB: may need a mutex here depending if */ + struct ceph_message_addr peer_addr; /* peer address */ + struct list_head list_head; enum ceph_con_state state; __u32 connect_seq; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ - /* out queue */ -/* note: need to adjust queues because we have a work queue for the message */ - spinlock_t out_queue_lock; + /* note: need to adjust queues because we have a work queue for the message */ struct list_head out_queue; - struct ceph_bufferlist out_partial; + struct ceph_bufferlist out_partial; /* refereces existing bufferlists; do not free() */ struct ceph_bufferlist_iterator out_pos; struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ /* partially read message contents */ - char in_tag; /* ack or msg */ + char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */ + int in_base_pos; /* for ack seq, or msg headers, or accept handshake */ __u32 in_partial_ack; - int in_base_pos; /* for ack seq, or msg header */ struct ceph_message *in_partial; struct ceph_bufferlist_iterator in_pos; /* for msg payload */ - struct work_struct rwork; /* received work */ struct work_struct swork; /* send work */ int retries; diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index b6a0ac49756e8..8edd647b6ccf5 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -23,6 +23,118 @@ static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_close = CEPH_MSGR_TAG_CLOSE; +/* + * connections + */ + +/* + * create a new connection. initial state is NEW. + */ +static struct ceph_connection *new_connection() +{ + struct ceph_connection *con; + con = kmalloc(sizeof(struct ceph_connection)); + if (con == NULL) return 0; + memset(&con, 0, sizeof(con)); + + spin_init(&con->con_lock); + INIT_WORK(&con->rwork, ceph_reader); /* setup work structure */ + + atomic_inc(&con->nref); + return con; +} + +/* + * get an existing connection, if any, for given addr + */ +static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ceph_entity_addr *addr) +{ + unsigned long key; + struct ceph_connection *con; + struct list_head *head, *p; + + /* + * the radix_tree has an unsigned long key and void * value. since + * ceph_entity_addr is bigger than that, we use a trivial hash key, and + * point to a list_head in ceph_connection, as you would with a hash + * table. in the rare event that the trivial hash collides, we just + * traverse the (short) list. + */ + key = *(unsigned long*)&addr->addr.sin_addr.s_addr; + key ^= addr->addr.sin_port; + + /* existing? */ + spin_lock(&msgr->lock); + head = radix_lookup(&msgr->connections, key); + if (head) { + list_for_each(p, head) { + con = list_entry(p, struct ceph_connection, list_head); + if (con->peer_addr == addr) { + atomic_inc(&con->nref); + goto out; + } + } + } + con = NULL; +out: + spin_unlock(&msgr->lock); + return con; +} + +/* + * drop a reference + */ +static void put_connection(struct ceph_connection *con) +{ + if (atomic_dec_and_test(&con->nref)) { + /* FIXME close socket? */ + kfree(con); + } +} + +/* + * add to connections tree + */ +static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con) +{ + unsigned long key; + struct list_head *head, *p; + + key = *(unsigned long*)&addr->addr.sin_addr.s_addr; + key ^= addr->addr.sin_port; + + spin_lock(&msgr->lock); + head = radix_lookup(&msgr->connections, key); + if (head) { + list_add(&head, &con->list_head); + } else { + list_init(&con->list_head); /* empty */ + radix_insert(&msgr->connections, key, &con->list_head); + } + spin_unlock(&msgr->lock); + + /* inc ref count */ + atomic_inc(&con->nref); + + return con; +} + +/* + * replace another connection + */ +static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new) +{ + spin_lock(&msgr->lock); + list_add(&new->list_head, &old->list_head); + list_remove(&old->list_head); + spin_unlock(&msgr->lock); + put_connection(old); /* dec reference count */ +} + + + + + /* * blocking versions @@ -111,7 +223,7 @@ static int ceph_send_message(struct ceph_message *message, struct socket *sd) /* * non-blocking versions * - * these are called while holding a lock on the connection + * these should be called while holding con->con_lock */ /* @@ -181,8 +293,29 @@ static void prepare_write_ack(struct ceph_connection *con) ceph_bl_init(&con->out_partial); ceph_bl_iterator_init(&con->out_pos); - ceph_bl_append_copy(&con->out_partial, &tag_ack, 1); - ceph_bl_append_copy(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked)); + ceph_bl_append_ref(&con->out_partial, &tag_ack, 1); + ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked)); +} + +static void prepare_write_accept_announce(struct ceph_connection *con) +{ + ceph_bl_init(&con->out_partial); + ceph_bl_iterator_init(&con->out_pos); + ceph_bl_append_ref(&con->out_partial, &con->msgr->addr, sizeof(con->msgr->addr)); +} + +static void prepare_write_accept_ready(struct ceph_connection *con) +{ + ceph_bl_init(&con->out_partial); + ceph_bl_iterator_init(&con->out_pos); + ceph_bl_append_ref(&con->out_partial, &tag_ready, 1); +} +static void prepare_write_accept_reject(struct ceph_connection *con) +{ + ceph_bl_init(&con->out_partial); + ceph_bl_iterator_init(&con->out_pos); + ceph_bl_append_ref(&con->out_partial, &tag_reject, 1); + ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq)); } /* @@ -193,18 +326,26 @@ static int try_write(struct ceph_connection *con) int ret; more: + /* data queued? */ if (con->out_partial.b_kvlen) { ret = write_partial(con); if (ret == 0) return 0; + /* error or success */ /* clean up */ ceph_bl_init(&con->out_partial); ceph_bl_iterator_init(&con->out_pos); + if (con->state == REJECTING) { + /* FIXME do something else here, pbly? */ + list_remove(&con->list_head); + con->state = CLOSED; + } + if (ret < 0) return ret; /* error */ } - /* what next? */ + /* anything else pending? */ if (con->in_seq > con->in_seq_acked) { prepare_write_ack(con); goto more; @@ -276,6 +417,28 @@ static int read_ack_partial(struct ceph_connection *con) return 1; /* done */ } + +static int read_accept_partial(struct ceph_connection *con) +{ + /* peer addr */ + while (con->in_base_pos < sizeof(con->peer_addr)) { + int left = sizeof(con->peer_addr) - con->in_base_pos; + ret = _read(socket, (char*)&con->peer_addr + con->in_base_pos, left); + if (ret <= 0) return ret; + con->in_base_pos += ret; + } + + /* connect_seq */ + while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) { + int off = in_base_pos - sizeof(con->peer_addr); + left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - in_base_pos; + ret = _read(socket, (char*)&m->connect_seq + off, left); + if (ret <= 0) return ret; + con->in_base_pos += ret; + } + return 1; /* done */ +} + /* * prepare to read a message */ @@ -311,11 +474,60 @@ static void process_ack(struct ceph_connection *con, __u32 ack) } } +static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con) +{ + struct ceph_connection *existing; + spin_lock(&msgr->lock); + existing = get_connection(msgr, &con->peer_addr); + if (existing) { + spin_lock(&existing->con_lock); + if ((existing->state == CONNECTING && compare_addr(&msgr->addr, &con->peer_addr)) || + (existing->state == OPEN && con->connect_seq == existing->connect_seq)) { + /* replace existing with new connection */ + replace_connection(msgr, existing, con); + /* steal message queue */ + list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */ + con->out_seq = existing->out_seq; + con->state = OPEN; + existing->state = CLOSED; + } else { + /* reject new connection */ + con->state = REJECTING; + con->connect_seq = existing->connect_seq; /* send this with the reject */ + } + spin_unlock(&existing->con_lock); + put_connection(existing); + } else { + add_connection(con); + con->state = OPEN; + } + spin_unlock(&msgr->lock); + + /* the result? */ + if (con->state == REJECTING) + prepare_write_accept_reject(con); + else + prepare_write_accept_ready(con); +} + + +/* + * call when data is available on the socket + */ static int try_read(struct ceph_connection *con) { int ret = -1; more: + if (con->state == CLOSED) return -1; + if (con->state == ACCEPTING) { + ret = read_accept_partial(con); + if (ret <= 0) return ret; + /* accepted */ + process_accept(con); + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_READY) { ret = _read(socket, &con->in_tag, 1); if (ret <= 0) return ret; @@ -382,18 +594,21 @@ static int ceph_accepter(void *unusedfornow) printk(KERN_INFO "accepted connection \n"); set_current_state(TASK_INTERRUPTIBLE); /* initialize the ceph connection */ - con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL); + con = new_connection(NULL); if (con == NULL) { printk(KERN_INFO "malloc failure\n"); sock_release(new_sd); break; } - con->sock = new_sd; - con->state = READ_PENDING; - /* setup work structure */ - INIT_WORK(&con->rwork, ceph_reader); - /* hand off to worker threads , read pending */ - queue_work(recv_wq, &con->rwork); + con->socket = new_sd; + con->state = ACCEPTING; + con->in_tag = CEPH_MSGR_TAG_READY; + + prepare_write_accept_announce(con); + list_add(&msgr->accepting, &con->list_head); + + /* hand off to worker threads , read pending */ + /*?? queue_work(recv_wq, &con->rwork);*/ } set_current_state(TASK_RUNNING); printk(KERN_INFO "kernel thread exiting\n"); -- 2.39.5