From 65ec3b331ee8efe52f51cea48a96fc2b9dce8e5e Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 7 Nov 2007 17:40:08 +0000 Subject: [PATCH] closer to compiling2 git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2030 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/kmsg.h | 1 + trunk/ceph/kernel/messenger.c | 32 +++++++++++++++++--------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/trunk/ceph/kernel/kmsg.h b/trunk/ceph/kernel/kmsg.h index 813fe1a050fbf..0bac338b44f57 100644 --- a/trunk/ceph/kernel/kmsg.h +++ b/trunk/ceph/kernel/kmsg.h @@ -57,6 +57,7 @@ enum ceph_connection_state { }; struct ceph_connection { + struct ceph_kmsgr *msgr; struct socket *sock; /* connection socket */ atomic_t nref; diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 4ffb5712d930a..babf282e238ef 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -29,13 +29,15 @@ static char tag_close = CEPH_MSGR_TAG_CLOSE; /* * create a new connection. initial state is NEW. */ -static struct ceph_connection *new_connection() +static struct ceph_connection *new_connection(struct ceph_kmsgr *msgr) { struct ceph_connection *con; con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL); if (con == NULL) return 0; memset(&con, 0, sizeof(con)); + con->msgr = msgr; + spin_lock_init(&con->con_lock); /*INIT_WORK(&con->rwork, ceph_reader);*/ /* setup work structure */ @@ -333,7 +335,7 @@ static int read_message_partial(struct ceph_connection *con) while (con->in_base_pos < sizeof(struct ceph_message_header) + chunkbytes) { int off = con->in_base_pos - sizeof(struct ceph_message_header); left = chunkbytes + sizeof(struct ceph_message_header) - con->in_base_pos; - ret = _read(con->sock, (char*)m->hdr.chunklens + off, left); + ret = _read(con->sock, (char*)m->chunklens + off, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -363,7 +365,7 @@ static int read_ack_partial(struct ceph_connection *con) { while (con->in_base_pos < sizeof(con->in_partial_ack)) { int left = sizeof(con->in_partial_ack) - con->in_base_pos; - ret = _read(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left); + int ret = _read(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -376,7 +378,7 @@ static int read_accept_partial(struct ceph_connection *con) /* peer addr */ while (con->in_base_pos < sizeof(con->peer_addr)) { int left = sizeof(con->peer_addr) - con->in_base_pos; - ret = _read(con->sock, (char*)&con->peer_addr + con->in_base_pos, left); + int ret = _read(con->sock, (char*)&con->peer_addr + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -384,7 +386,7 @@ static int read_accept_partial(struct ceph_connection *con) /* connect_seq */ while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) { int off = con->in_base_pos - sizeof(con->peer_addr); - left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos; + int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos; ret = _read(con->sock, (char*)&con->connect_seq + off, left); if (ret <= 0) return ret; con->in_base_pos += ret; @@ -430,19 +432,19 @@ static void process_ack(struct ceph_connection *con, __u32 ack) /* * call after a new connection's handshake has completed */ -static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con) +static void process_accept(struct ceph_connection *con) { struct ceph_connection *existing; /* do we already have a connection for this peer? */ - spin_lock(&msgr->con_lock); - existing = get_connection(msgr, &con->peer_addr); + spin_lock(&con->msgr->con_lock); + existing = get_connection(con->msgr, &con->peer_addr); if (existing) { spin_lock(&existing->con_lock); - if ((existing->state == CONNECTING && compare_addr(&msgr->addr, &con->peer_addr)) || + if ((existing->state == CONNECTING && compare_addr(&con->msgr->addr, &con->peer_addr)) || (existing->state == OPEN && con->connect_seq == existing->connect_seq)) { /* replace existing with new connection */ - replace_connection(msgr, existing, con); + replace_connection(con->msgr, existing, con); /* steal message queue */ list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */ con->out_seq = existing->out_seq; @@ -456,10 +458,10 @@ static void process_accept(struct ceph_kmsgr *msgr, struct ceph_connection *con) spin_unlock(&existing->con_lock); put_connection(existing); } else { - add_connection_accepted(msgr, con); + add_connection_accepted(con->msgr, con); con->state = OPEN; } - spin_unlock(&msgr->con_lock); + spin_unlock(&con->msgr->con_lock); /* the result? */ if (con->state == REJECTING) @@ -485,7 +487,7 @@ more: ret = read_accept_partial(con); if (ret <= 0) return ret; /* accepted */ - process_accept(msgr, con); + process_accept(con); goto more; } @@ -506,7 +508,7 @@ more: ret = read_message_partial(con); if (ret <= 0) return ret; /* got a full message! */ - msgr->dispatch(msgr->parent, con->in_partial); + msgr->dispatch(con->msgr->parent, con->in_partial); ceph_put_msg(con->in_partial); con->in_partial = 0; con->in_tag = CEPH_MSGR_TAG_READY; @@ -555,7 +557,7 @@ static int ceph_accepter(struct ceph_kmsgr *msgr) printk(KERN_INFO "accepted connection \n"); set_current_state(TASK_INTERRUPTIBLE); /* initialize the msgr connection */ - con = new_connection(); + con = new_connection(msgr); if (con == NULL) { printk(KERN_INFO "malloc failure\n"); sock_release(new_sd); -- 2.39.5