key ^= addr->addr.sin_port;
/* existing? */
- spin_lock(&msgr->lock);
- head = radix_lookup(&msgr->connections, key);
+ spin_lock(&msgr->con_lock);
+ head = radix_lookup(&msgr->con_open, key);
if (head) {
list_for_each(p, head) {
- con = list_entry(p, struct ceph_connection, list_head);
+ con = list_entry(p, struct ceph_connection, list_peer);
if (con->peer_addr == addr) {
atomic_inc(&con->nref);
goto out;
}
con = NULL;
out:
- spin_unlock(&msgr->lock);
+ spin_unlock(&msgr->con_lock);
return con;
}
/*
* add to connections tree
*/
-static void add_connection(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+static void add_connection_accepted(struct ceph_kmsgr *msgr, struct ceph_connection *con)
{
unsigned long key;
struct list_head *head, *p;
key = *(unsigned long*)&addr->addr.sin_addr.s_addr;
key ^= addr->addr.sin_port;
- spin_lock(&msgr->lock);
- head = radix_lookup(&msgr->connections, key);
+ /* inc ref count */
+ atomic_inc(&con->nref);
+
+ spin_lock(&msgr->con_lock);
+ head = radix_lookup(&msgr->con_open, key);
if (head) {
- list_add(&head, &con->list_head);
+ list_add(&head, &con->list_peer);
} else {
- list_init(&con->list_head); /* empty */
- radix_insert(&msgr->connections, key, &con->list_head);
+ list_init(&con->list_peer); /* empty */
+ radix_insert(&msgr->connections, key, &con->list_peer);
}
- spin_unlock(&msgr->lock);
-
- /* inc ref count */
- atomic_inc(&con->nref);
+ spin_unlock(&msgr->con_lock);
return con;
}
+static void add_connection_accepting(struct ceph_kmsgr *msgr, struct ceph_connection *con)
+{
+ atomic_inc(&con->nref);
+ spin_lock(&msgr->con_lock);
+ list_add(&msgr->con_accepting, &con->list_head);
+ list_add(&msgr->con_all, &con->list_head);
+ spin_unlock(&msgr->con_lock);
+}
+
+
/*
* replace another connection
* (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
*/
static void replace_connection(struct ceph_kmsgr *msgr, struct ceph_connection *old, struct ceph_connection *new)
{
- spin_lock(&msgr->lock);
- list_add(&new->list_head, &old->list_head);
- list_remove(&old->list_head);
- spin_unlock(&msgr->lock);
+ spin_lock(&msgr->con_lock);
+ list_add(&new->list_peer, &old->list_peer);
+ list_remove(&old->list_peer);
+ spin_unlock(&msgr->con_lock);
put_connection(old); /* dec reference count */
}
if (con->state == REJECTING) {
/* FIXME do something else here, pbly? */
- list_remove(&con->list_head);
+ list_remove(&con->list_peer);
+ list_remove(&con->list_all);
con->state = CLOSED;
+ put_connection(con);
}
if (ret < 0) return ret; /* error */
struct ceph_connection *existing;
/* do we already have a connection for this peer? */
- spin_lock(&msgr->lock);
+ spin_lock(&msgr->con_lock);
existing = get_connection(msgr, &con->peer_addr);
if (existing) {
spin_lock(&existing->con_lock);
spin_unlock(&existing->con_lock);
put_connection(existing);
} else {
- add_connection(con);
+ add_connection_accepted(con);
con->state = OPEN;
}
- spin_unlock(&msgr->lock);
+ spin_unlock(&msgr->con_lock);
/* the result? */
if (con->state == REJECTING)
ret = read_message_partial(con);
if (ret <= 0) return ret;
/* got a full message! */
- ceph_dispatch(con->msgr, con->in_partial);
+ msgr->dispatch(msgr->parent, con->in_partial);
cphe_put_msg(con->in_partial);
con->in_partial = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
printk(KERN_INFO "accepted connection \n");
set_current_state(TASK_INTERRUPTIBLE);
/* initialize the msgr connection */
- con = new_connection(NULL);
+ con = new_connection();
if (con == NULL) {
printk(KERN_INFO "malloc failure\n");
sock_release(new_sd);
con->in_tag = CEPH_MSGR_TAG_READY;
prepare_write_accept_announce(con);
- list_add(&msgr->accepting, &con->list_head);
+
+ add_connection_accepting(msgr, con);
/* hand off to worker threads , read pending */
/*?? queue_work(recv_wq, &con->rwork);*/
return(0);
}
-void ceph_dispatch(struct ceph_client *client, struct ceph_message *msg)
-{
- /* deliver the message */
- switch (msg->hdr.type) {
- /* mds client */
- case CEPH_MSG_MDSMAP:
- ceph_mdsc_handle_map(client->mds_client, msg);
- break;
- case CEPH_MSG_CLIENT_REPLY:
- ceph_mdsc_handle_reply(client->mds_client, msg);
- break;
- case CEPH_MSG_CLIENT_FORWARD:
- ceph_mdsc_handle_forward(client->mds_client, msg);
- break;
-
- /* osd client */
- case CEPH_MSG_OSDMAP:
- ceph_osdc_handle_map(client->osd_client, msg);
- break;
- case CEPH_MSG_OSD_OPREPLY:
- ceph_osdc_handle_reply(client->osd_client, msg);
- break;
-
- default:
- printk(KERN_INFO "unknown message type %d\n", msg->hdr.type);
- ceph_put_msg(msg);
- }
-}
-
int ceph_work_init(void)
{