struct ceph_kmsgr {
void *m_parent;
struct task_struct *athread;
- struct radix_tree_root mpipes; /* other nodes talk to */
+ struct radix_tree_root connections; /* see get_connection() */
+ struct list_head accepting; /* connections that aren't open yet */
};
struct ceph_message {
- atomic_t nref;
- int mflags;
- struct ceph_message_header *msghdr; /* header */
- __u32 chunklens[2];
+ struct ceph_message_header hdr; /* header */
+ __u32 chunklen[2];
struct ceph_bufferlist payload;
- struct list_head m_list_head;
+
+ struct list_head list_head;
+ atomic_t nref;
};
/* current state of connection, probably won't need all these.. */
enum ceph_con_state {
+ NEW,
ACCEPTING,
CONNECTING,
OPEN,
+ REJECTING,
+ CLOSED,
READ_PENDING,
READING,
struct ceph_connection {
struct socket *sock; /* connection socket */
- /* TDB: may need a mutex here depending if */
- spinlock_t con_lock;
+
+ atomic_t nref;
+ spinlock_t con_lock; /* TDB: may need a mutex here depending if */
+ struct ceph_message_addr peer_addr; /* peer address */
+ struct list_head list_head;
enum ceph_con_state state;
__u32 connect_seq;
__u32 out_seq; /* last message queued for send */
__u32 in_seq, in_seq_acked; /* last message received, acked */
-
/* out queue */
-/* note: need to adjust queues because we have a work queue for the message */
- spinlock_t out_queue_lock;
+ /* note: need to adjust queues because we have a work queue for the message */
struct list_head out_queue;
- struct ceph_bufferlist out_partial;
+ struct ceph_bufferlist out_partial; /* refereces existing bufferlists; do not free() */
struct ceph_bufferlist_iterator out_pos;
struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */
/* partially read message contents */
- char in_tag; /* ack or msg */
+ char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */
+ int in_base_pos; /* for ack seq, or msg headers, or accept handshake */
__u32 in_partial_ack;
- int in_base_pos; /* for ack seq, or msg header */
struct ceph_message *in_partial;
struct ceph_bufferlist_iterator in_pos; /* for msg payload */
-
struct work_struct rwork; /* received work */
struct work_struct swork; /* send work */
int retries;
static char tag_close = CEPH_MSGR_TAG_CLOSE;
+/*
+ * connections
+ */
+
+/*
+ * create a new connection. initial state is NEW.
+ */
+static struct ceph_connection *new_connection()
+{
+ struct ceph_connection *con;
+ con = kmalloc(sizeof(struct ceph_connection));
+ if (con == NULL) return 0;
+ memset(&con, 0, sizeof(con));
+
+ spin_init(&con->con_lock);
+ INIT_WORK(&con->rwork, ceph_reader); /* setup work structure */
+
+ atomic_inc(&con->nref);
+ return con;
+}
+
+/*
+ * get an existing connection, if any, for given addr
+ */
+static struct ceph_connection *get_connection(struct ceph_kmsgr *msgr, struct ceph_entity_addr *addr)
+{
+ unsigned long key;
+ struct ceph_connection *con;
+ struct list_head *head, *p;
+
+ /*
+ * the radix_tree has an unsigned long key and void * value. since
+ * ceph_entity_addr is bigger than that, we use a trivial hash key, and
+ * point to a list_head in ceph_connection, as you would with a hash
+ * table. in the rare event that the trivial hash collides, we just
+ * traverse the (short) list.
+ */
+ key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
+ key ^= addr->addr.sin_port;
+
+ /* existing? */
+ spin_lock(&msgr->lock);
+ head = radix_lookup(&msgr->connections, key);
+ if (head) {
+ list_for_each(p, head) {
+ con = list_entry(p, struct ceph_connection, list_head);
+ if (con->peer_addr == addr) {
+ atomic_inc(&con->nref);
+ goto out;
+ }
+ }
+ }
+ con = NULL;
+out:
+ spin_unlock(&msgr->lock);
+ return con;
+}
+
+/*
+ * drop a reference
+ */
+static void put_connection(struct ceph_connection *con)
+{
+ if (atomic_dec_and_test(&con->nref)) {
+ /* FIXME close socket? */
+ kfree(con);
+ }
+}
+
+/*
+ * add to connections tree
+ */
+static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+ unsigned long key;
+ struct list_head *head, *p;
+
+ key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
+ key ^= addr->addr.sin_port;
+
+ spin_lock(&msgr->lock);
+ head = radix_lookup(&msgr->connections, key);
+ if (head) {
+ list_add(&head, &con->list_head);
+ } else {
+ list_init(&con->list_head); /* empty */
+ radix_insert(&msgr->connections, key, &con->list_head);
+ }
+ spin_unlock(&msgr->lock);
+
+ /* inc ref count */
+ atomic_inc(&con->nref);
+
+ return con;
+}
+
+/*
+ * replace another connection
+ */
+static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new)
+{
+ spin_lock(&msgr->lock);
+ list_add(&new->list_head, &old->list_head);
+ list_remove(&old->list_head);
+ spin_unlock(&msgr->lock);
+ put_connection(old); /* dec reference count */
+}
+
+
+
+
+
/*
* blocking versions
/*
* non-blocking versions
*
- * these are called while holding a lock on the connection
+ * these should be called while holding con->con_lock
*/
/*
ceph_bl_init(&con->out_partial);
ceph_bl_iterator_init(&con->out_pos);
- ceph_bl_append_copy(&con->out_partial, &tag_ack, 1);
- ceph_bl_append_copy(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+ ceph_bl_append_ref(&con->out_partial, &tag_ack, 1);
+ ceph_bl_append_ref(&con->out_partial, &con->in_seq_acked, sizeof(con->in_seq_acked));
+}
+
+static void prepare_write_accept_announce(struct ceph_connection *con)
+{
+ ceph_bl_init(&con->out_partial);
+ ceph_bl_iterator_init(&con->out_pos);
+ ceph_bl_append_ref(&con->out_partial, &con->msgr->addr, sizeof(con->msgr->addr));
+}
+
+static void prepare_write_accept_ready(struct ceph_connection *con)
+{
+ ceph_bl_init(&con->out_partial);
+ ceph_bl_iterator_init(&con->out_pos);
+ ceph_bl_append_ref(&con->out_partial, &tag_ready, 1);
+}
+static void prepare_write_accept_reject(struct ceph_connection *con)
+{
+ ceph_bl_init(&con->out_partial);
+ ceph_bl_iterator_init(&con->out_pos);
+ ceph_bl_append_ref(&con->out_partial, &tag_reject, 1);
+ ceph_bl_append_ref(&con->out_partial, &con->connect_seq, sizeof(con->connect_seq));
}
/*
int ret;
more:
+ /* data queued? */
if (con->out_partial.b_kvlen) {
ret = write_partial(con);
if (ret == 0) return 0;
+ /* error or success */
/* clean up */
ceph_bl_init(&con->out_partial);
ceph_bl_iterator_init(&con->out_pos);
+ if (con->state == REJECTING) {
+ /* FIXME do something else here, pbly? */
+ list_remove(&con->list_head);
+ con->state = CLOSED;
+ }
+
if (ret < 0) return ret; /* error */
}
- /* what next? */
+ /* anything else pending? */
if (con->in_seq > con->in_seq_acked) {
prepare_write_ack(con);
goto more;
return 1; /* done */
}
+
+static int read_accept_partial(struct ceph_connection *con)
+{
+ /* peer addr */
+ while (con->in_base_pos < sizeof(con->peer_addr)) {
+ int left = sizeof(con->peer_addr) - con->in_base_pos;
+ ret = _read(socket, (char*)&con->peer_addr + con->in_base_pos, left);
+ if (ret <= 0) return ret;
+ con->in_base_pos += ret;
+ }
+
+ /* connect_seq */
+ while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
+ int off = in_base_pos - sizeof(con->peer_addr);
+ left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - in_base_pos;
+ ret = _read(socket, (char*)&m->connect_seq + off, left);
+ if (ret <= 0) return ret;
+ con->in_base_pos += ret;
+ }
+ return 1; /* done */
+}
+
/*
* prepare to read a message
*/
}
}
+static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+ struct ceph_connection *existing;
+ spin_lock(&msgr->lock);
+ existing = get_connection(msgr, &con->peer_addr);
+ if (existing) {
+ spin_lock(&existing->con_lock);
+ if ((existing->state == CONNECTING && compare_addr(&msgr->addr, &con->peer_addr)) ||
+ (existing->state == OPEN && con->connect_seq == existing->connect_seq)) {
+ /* replace existing with new connection */
+ replace_connection(msgr, existing, con);
+ /* steal message queue */
+ list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */
+ con->out_seq = existing->out_seq;
+ con->state = OPEN;
+ existing->state = CLOSED;
+ } else {
+ /* reject new connection */
+ con->state = REJECTING;
+ con->connect_seq = existing->connect_seq; /* send this with the reject */
+ }
+ spin_unlock(&existing->con_lock);
+ put_connection(existing);
+ } else {
+ add_connection(con);
+ con->state = OPEN;
+ }
+ spin_unlock(&msgr->lock);
+
+ /* the result? */
+ if (con->state == REJECTING)
+ prepare_write_accept_reject(con);
+ else
+ prepare_write_accept_ready(con);
+}
+
+
+/*
+ * call when data is available on the socket
+ */
static int try_read(struct ceph_connection *con)
{
int ret = -1;
more:
+ if (con->state == CLOSED) return -1;
+ if (con->state == ACCEPTING) {
+ ret = read_accept_partial(con);
+ if (ret <= 0) return ret;
+ /* accepted */
+ process_accept(con);
+ goto more;
+ }
+
if (con->in_tag == CEPH_MSGR_TAG_READY) {
ret = _read(socket, &con->in_tag, 1);
if (ret <= 0) return ret;
printk(KERN_INFO "accepted connection \n");
set_current_state(TASK_INTERRUPTIBLE);
/* initialize the ceph connection */
- con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+ con = new_connection(NULL);
if (con == NULL) {
printk(KERN_INFO "malloc failure\n");
sock_release(new_sd);
break;
}
- con->sock = new_sd;
- con->state = READ_PENDING;
- /* setup work structure */
- INIT_WORK(&con->rwork, ceph_reader);
- /* hand off to worker threads , read pending */
- queue_work(recv_wq, &con->rwork);
+ con->socket = new_sd;
+ con->state = ACCEPTING;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+
+ prepare_write_accept_announce(con);
+ list_add(&msgr->accepting, &con->list_head);
+
+ /* hand off to worker threads , read pending */
+ /*?? queue_work(recv_wq, &con->rwork);*/
}
set_current_state(TASK_RUNNING);
printk(KERN_INFO "kernel thread exiting\n");