int ceph_debug_msgr = 50;
#define DOUT_VAR ceph_debug_msgr
-#define DOUT_PREFIX "msgr: "
+#define DOUT_PREFIX "msgr: "
#include "super.h"
/* static tag bytes */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
static void try_read(struct work_struct *);
static void try_write(struct work_struct *);
static void try_accept(struct work_struct *);
-#else
-static void try_read(void *);
-static void try_write(void *);
-static void try_accept(void *);
-#endif
* connections
*/
-/*
+/*
* create a new connection. initial state is NEW.
*/
static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
{
struct ceph_connection *con;
con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL);
- if (con == NULL)
+ if (con == NULL)
return NULL;
con->msgr = msgr;
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
INIT_WORK(&con->rwork, try_read);
INIT_DELAYED_WORK(&con->swork, try_write);
-#else
- INIT_WORK(&con->rwork, try_read, con);
- INIT_WORK(&con->swork, try_write, con);
-#endif
return con;
}
* table. in the rare event that the trivial hash collides, we just
* traverse the (short) list.
*/
-static unsigned long hash_addr(struct ceph_entity_addr *addr)
+static unsigned long hash_addr(struct ceph_entity_addr *addr)
{
unsigned long key;
key = *(__u32*)&addr->ipaddr.sin_addr.s_addr;
return key;
}
-/*
+/*
* get an existing connection, if any, for given addr
*/
-static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
+ struct ceph_entity_addr *addr)
{
struct ceph_connection *con = NULL;
struct list_head *head, *p;
/* existing? */
head = radix_tree_lookup(&msgr->con_open, key);
- if (head == NULL)
+ if (head == NULL)
goto out;
con = list_entry(head, struct ceph_connection, list_bucket);
if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
-/*
+/*
* drop a reference
*/
-static void put_connection(struct ceph_connection *con)
+static void put_connection(struct ceph_connection *con)
{
dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
if (atomic_dec_and_test(&con->nref)) {
}
}
-/*
+/*
* add to connections tree
*/
-static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __add_connection(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
struct list_head *head;
unsigned long key = hash_addr(&con->peer_addr);
head = radix_tree_lookup(&msgr->con_open, key);
if (head) {
- dout(20, "add_connection %p in existing bucket %lu head %p\n", con, key, head);
+ dout(20, "add_connection %p in existing bucket %lu head %p\n",
+ con, key, head);
list_add(&con->list_bucket, head);
} else {
- dout(20, "add_connection %p in new bucket %lu head %p\n", con, key, &con->list_bucket);
+ dout(20, "add_connection %p in new bucket %lu head %p\n", con,
+ key, &con->list_bucket);
INIT_LIST_HEAD(&con->list_bucket); /* empty */
radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
}
}
-static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void add_connection_accepting(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
atomic_inc(&con->nref);
spin_lock(&msgr->con_lock);
* remove connection from all list.
* also, from con_open radix tree, if it should have been there
*/
-static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __remove_connection(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
unsigned long key;
void **slot, *val;
key = hash_addr(&con->peer_addr);
if (list_empty(&con->list_bucket)) {
/* last one */
- dout(20, "__remove_connection %p and removing bucket %lu\n", con, key);
+ dout(20, "__remove_connection %p and bucket %lu\n",
+ con, key);
radix_tree_delete(&msgr->con_open, key);
} else {
slot = radix_tree_lookup_slot(&msgr->con_open, key);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
val = radix_tree_deref_slot(slot);
-#else
- val = *slot;
-#endif
- dout(20, "__remove_connection %p from bucket %lu head %p\n", con, key, val);
+ dout(20, "__remove_connection %p from bucket %lu "
+ "head %p\n", con, key, val);
if (val == &con->list_bucket) {
- dout(20, "__remove_connection adjusting bucket ptr"
- " for %lu to next item, %p\n", key,
+ dout(20, "__remove_connection adjusting bucket"
+ " for %lu to next item, %p\n", key,
con->list_bucket.next);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
- radix_tree_replace_slot(slot, con->list_bucket.next);
-#else
- *slot = con->list_bucket.next;
-#endif
+ radix_tree_replace_slot(slot,
+ con->list_bucket.next);
}
list_del(&con->list_bucket);
}
put_connection(con);
}
-static void remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void remove_connection(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
spin_lock(&msgr->con_lock);
__remove_connection(msgr, con);
{
dout(40, "ceph_queue_write %p\n", con);
atomic_inc(&con->nref);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
if (!queue_work(send_wq, &con->swork.work)) {
-#else
- if (!queue_work(send_wq, &con->swork)) {
-#endif
dout(40, "ceph_queue_write %p - already queued\n", con);
put_connection(con);
}
*/
static void ceph_send_fault(struct ceph_connection *con)
{
- derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n",
+ derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n",
con, con->state,
IPQUADPORT(con->peer_addr.ipaddr));
/* PW if never get here remove */
if (test_bit(WAIT, &con->state)) {
- derr(30, "ceph_send_fault received socket close during WAIT state\n");
+ derr(30, "ceph_send_fault socket close during WAIT state\n");
return;
}
con->sock = NULL;
set_bit(NEW, &con->state);
- /* If there are no messages in the queue, place the connection
+ /* If there are no messages in the queue, place the connection
* in a STANDBY state otherwise retry with delay */
if (list_empty(&con->out_queue)) {
dout(10, "setting STANDBY bit\n");
int ret;
while (con->out_kvec_bytes > 0) {
- ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+ ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
con->out_kvec_left, con->out_kvec_bytes);
if (ret <= 0) goto out;
con->out_kvec_bytes -= ret;
con->out_kvec_left = 0;
ret = 1;
out:
- dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
+ dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
con->out_kvec_left, con->out_kvec_bytes, ret);
return ret; /* done! */
}
-static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
+static int write_partial_msg_pages(struct ceph_connection *con,
+ struct ceph_msg *msg)
{
struct kvec kv;
int ret;
unsigned data_len = le32_to_cpu(msg->hdr.data_len);
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
- kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
- kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
+ kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) +
+ con->out_msg_pos.page_pos;
+ kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
if (ret < 0) return ret;
if (ret == kv.iov_len) {
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
- }
+ }
}
/* done */
*/
static void prepare_write_message(struct ceph_connection *con)
{
- struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
+ struct ceph_msg *m = list_entry(con->out_queue.next,
+ struct ceph_msg, list_head);
/* move to sending/sent list */
list_del(&m->list_head);
con->out_msg = m; /* FIXME: do we want to take a reference here? */
/* encode header */
- dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n",
+ dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n",
m, le64_to_cpu(m->hdr.seq), le32_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len));
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
set_bit(WRITE_PENDING, &con->state);
}
-/*
+/*
* prepare an ack for send
*/
static void prepare_write_ack(struct ceph_connection *con)
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void prepare_write_connect(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void prepare_write_accept_announce(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
/*
* worker function when socket is writeable
*/
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
static void try_write(struct work_struct *work)
-#else
-static void try_write(void *arg)
-#endif
{
- struct ceph_connection *con =
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
- container_of(work, struct ceph_connection, swork.work);
-#else
- arg;
-#endif
+ struct ceph_connection *con =
+ container_of(work, struct ceph_connection, swork.work);
struct ceph_messenger *msgr = con->msgr;
int ret = 1;
- dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
+ dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
atomic_read(&con->nref));
if (test_bit(CLOSED, &con->state)) {
con->connect_seq++;
prepare_write_connect(msgr, con);
set_bit(CONNECTING, &con->state);
- dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
+ dout(5, "try_write initiating connect on %p new state %lu\n",
+ con, con->state);
ret = ceph_tcp_connect(con);
if (ret < 0) {
derr(1, "try_write tcp connect error %d\n", ret);
if (ret == 0)
goto done;
if (ret < 0) {
- dout(30, "try_write write_partial_kvec returned error %d\n", ret);
+ dout(30, "try_write write_partial_kvec err %d\n", ret);
goto done;
}
}
/* check if connect handshake finished.. if not requeue and return.. */
/*
if (!test_bit(OPEN, &con->state)) {
- dout(5, "try_write state = %lu, need to requeue and exit\n", con->state);
- goto done;
- }
+ dout(5, "try_write state = %lu, need to requeue and exit\n",
+ con->state);
+ goto done;
+ }
*/
-
+
/* msg pages? */
if (con->out_msg) {
ret = write_partial_msg_pages(con, con->out_msg);
- if (ret == 0)
+ if (ret == 0)
goto done;
if (ret < 0) {
- dout(30, "try_write write_partial_msg_pages returned error %d\n", ret);
+ dout(30, "try_write write_partial_msg_pages err %d\n",
+ ret);
goto done;
}
}
-
+
/* anything else pending? */
spin_lock(&con->out_queue_lock);
if (con->in_seq > con->in_seq_acked) {
return;
}
-/*
+/*
* prepare to read a message
*/
static int prepare_read_message(struct ceph_connection *con)
con->in_base_pos = 0;
con->in_msg = ceph_msg_new(0, 0, 0, 0, 0);
if (IS_ERR(con->in_msg)) {
- /* TBD: we don't check for error in caller, handle error here? */
+ /* TBD: we don't check for error in caller, handle it here? */
err = PTR_ERR(con->in_msg);
- con->in_msg = 0;
+ con->in_msg = 0;
derr(1, "kmalloc failure on incoming message %d\n", err);
return err;
}
int ret;
int want, left;
unsigned front_len, data_len, data_off;
-
+
dout(20, "read_message_partial con %p msg %p\n", con, m);
/* header */
while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
left = sizeof(struct ceph_msg_header) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos, left);
+ ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
+ left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
if (con->in_base_pos == sizeof(struct ceph_msg_header))
return -ENOMEM;
}
left = front_len - m->front.iov_len;
- ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left);
+ ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base +
+ m->front.iov_len, left);
if (ret <= 0) return ret;
m->front.iov_len += ret;
}
/* (page) data */
data_len = le32_to_cpu(m->hdr.data_len);
data_off = le32_to_cpu(m->hdr.data_off);
- if (data_len == 0)
+ if (data_len == 0)
goto done;
if (m->nr_pages == 0) {
con->in_msg_pos.page = 0;
BUG_ON(!con->msgr->prepare_pages);
ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
if (ret < 0) {
- dout(10, "prepare_pages failed, skipping+discarding message\n");
- con->in_base_pos = -data_len; /* ignore rest of message */
+ dout(10, "prepare_pages failed, skipping payload\n");
+ con->in_base_pos = -data_len; /* skip payload */
ceph_msg_put(con->in_msg);
con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
BUG_ON(m->nr_pages != want);
}
/*
- * FIXME: we should discard the data payload if ret
+ * FIXME: we should discard the data payload if ret
*/
}
while (con->in_msg_pos.data_pos < data_len) {
left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- /*dout(10, "data_pos = %d, data_len = %d, page_pos=%d left = %d\n",
- con->in_msg_pos.data_pos, m->hdr.data_len, con->in_msg_pos.page_pos, left);*/
p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
if (ret <= 0) return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
/*
* in practice, we learn our ip from the first incoming mon
- * message, before anyone else knows we exist, so this is
+ * message, before anyone else knows we exist, so this is
* safe.
*/
con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
- dout(10, "read_message_partial learned my addr is %u.%u.%u.%u:%u\n",
+ dout(10, "read_message_partial learned my addr is "
+ "%u.%u.%u.%u:%u\n",
IPQUADPORT(con->msgr->inst.addr.ipaddr));
}
}
-/*
+/*
* prepare to read an ack
*/
static void prepare_read_ack(struct ceph_connection *con)
{
while (con->in_base_pos < sizeof(con->in_partial_ack)) {
int left = sizeof(con->in_partial_ack) - con->in_base_pos;
- int ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
+ int ret = ceph_tcp_recvmsg(con->sock,
+ (char*)&con->in_partial_ack +
+ con->in_base_pos, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
while (!list_empty(&con->out_sent)) {
m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
seq = le64_to_cpu(m->hdr.seq);
- if (seq > ack)
+ if (seq > ack)
break;
- dout(5, "got ack for seq %llu type %d at %p\n", seq,
+ dout(5, "got ack for seq %llu type %d at %p\n", seq,
le32_to_cpu(m->hdr.type), m);
list_del(&m->list_head);
ceph_msg_put(m);
}
-/*
+/*
* read portion of connect-side handshake on a new connection
*/
static int read_connect_partial(struct ceph_connection *con)
{
int ret, to;
- dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos);
+ dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
/* actual_peer_addr */
to = sizeof(con->actual_peer_addr);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left);
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char*)&con->actual_peer_addr + have,
+ left);
if (ret <= 0) goto out;
con->in_base_pos += ret;
}
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->in_connect_seq) - left;
- ret = ceph_tcp_recvmsg(con->sock,
- (char*)&con->in_connect_seq + have, left);
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char*)&con->in_connect_seq +
+ have, left);
if (ret <= 0) goto out;
con->in_base_pos += ret;
- }
+ }
}
ret = 1;
out:
- dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
- dout(20, "read_connect_partial peer in connect_seq = %u\n", le32_to_cpu(con->in_connect_seq));
+ dout(20, "read_connect_partial %p end at %d ret %d\n", con,
+ con->in_base_pos, ret);
+ dout(20, "read_connect_partial peer in connect_seq = %u\n",
+ le32_to_cpu(con->in_connect_seq));
return ret; /* done */
}
{
dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
- derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n",
+ derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, "
+ "got %u.%u.%u.%u:%u/%d, wtf\n",
IPQUADPORT(con->peer_addr.ipaddr),
con->peer_addr.nonce,
IPQUADPORT(con->actual_peer_addr.ipaddr),
}
switch (con->in_tag) {
case CEPH_MSGR_TAG_RESETSESSION:
- dout(10, "process_connect got session RESET peers in_connect_seq %u\n",
+ dout(10, "process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_connect_seq));
reset_connection(con);
prepare_write_connect(con->msgr, con);
con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
break;
case CEPH_MSGR_TAG_RETRY:
- dout(10,
- "process_connect got session RETRY connect_seq = %u, in_connect_seq = %u\n",
- le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq));
+ dout(10,
+ "process_connect got RETRY my seq = %u, peer_seq = %u\n",
+ le32_to_cpu(con->out_connect_seq),
+ le32_to_cpu(con->in_connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect_seq);
- prepare_write_connect(con->msgr, con);
+ prepare_write_connect(con->msgr, con);
break;
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
set_bit(OPEN, &con->state);
break;
default:
- derr(1, "process_connect protocol error, try connecting again in a bit\n");
+ derr(1, "process_connect protocol error, will retry\n");
con->delay = 10 * HZ; /* maybe use default.. */
ceph_send_fault(con);
}
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_addr + have, left);
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char*)&con->peer_addr + have, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->peer_addr) - left;
- ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_connect_seq + have, left);
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char*)&con->in_connect_seq + have,
+ left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
/*
* replace another connection
- * (old and new should be for the _same_ peer,
+ * (old and new should be for the _same_ peer,
* and thus in the same pos in the radix tree)
*/
-static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+static void __replace_connection(struct ceph_messenger *msgr,
+ struct ceph_connection *old,
+ struct ceph_connection *new)
{
clear_bit(OPEN, &old->state);
reset_connection(existing);
/* replace connection */
__replace_connection(msgr, existing, con);
- con->msgr->peer_reset(con->msgr->parent,
+ con->msgr->peer_reset(con->msgr->parent,
&con->peer_name);
} else {
/* old attempt or peer didn't get the READY */
prepare_write_accept_retry(con, &tag_retry);
}
} else if (peer_cseq == existing->connect_seq &&
- (test_bit(CONNECTING, &existing->state) ||
+ (test_bit(CONNECTING, &existing->state) ||
test_bit(WAIT, &existing->state))) {
/* connection race */
dout(20, "process_accept connection race state = %lu\n",
con->state);
- if (ceph_entity_addr_equal(&msgr->inst.addr,
+ if (ceph_entity_addr_equal(&msgr->inst.addr,
&con->peer_addr)) {
/* incoming connection wins.. */
/* replace existing with new connection */
- __replace_connection(msgr, existing, con);
+ __replace_connection(msgr, existing, con);
} else {
/* our existing outgoing connection wins..
- tell peer to wait for our outgoing
+ tell peer to wait for our outgoing
connection to go through */
prepare_write_accept_reply(con, &tag_wait);
}
- } else if (existing->connect_seq == 0 &&
+ } else if (existing->connect_seq == 0 &&
(peer_cseq > existing->connect_seq)) {
/* we reset and already reconnecting */
prepare_write_accept_reply(con, &tag_reset);
}
put_connection(existing);
} else if (peer_cseq > 0) {
- dout(20, "process_accept no existing connection, connection reset\n");
+ dout(20, "process_accept no existing connection, we reset\n");
prepare_write_accept_reply(con, &tag_reset);
} else {
- dout(20, "process_accept no existing connection, connection now OPEN\n");
+ dout(20, "process_accept no existing connection, opening\n");
__add_connection(msgr, con);
- set_bit(OPEN, &con->state);
+ set_bit(OPEN, &con->state);
con->connect_seq = peer_cseq + 1;
prepare_write_accept_reply(con, &tag_ready);
}
/*
* worker function when data is available on the socket
*/
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
static void try_read(struct work_struct *work)
-#else
-static void try_read(void *arg)
-#endif
{
int ret = -1;
struct ceph_connection *con;
struct ceph_messenger *msgr;
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
con = container_of(work, struct ceph_connection, rwork);
-#else
- con = arg;
-#endif
dout(20, "try_read start on %p\n", con);
msgr = con->msgr;
if (con->in_base_pos < 0) {
/* skipping + discarding content */
static char buf[1024];
- ret = ceph_tcp_recvmsg(con->sock, buf,
+ ret = ceph_tcp_recvmsg(con->sock, buf,
min(1024, -con->in_base_pos));
if (ret <= 0) goto done;
con->in_base_pos += ret;
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
if (ret <= 0) goto done;
dout(30, "try_read got tag %d\n", (int)con->in_tag);
- if (con->in_tag == CEPH_MSGR_TAG_MSG)
+ if (con->in_tag == CEPH_MSGR_TAG_MSG)
prepare_read_message(con);
else if (con->in_tag == CEPH_MSGR_TAG_ACK)
prepare_read_ack(con);
ret = read_message_partial(con);
if (ret <= 0) goto done;
- dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", con->in_msg,
- ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)),
+ dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n",
+ con->in_msg,
+ ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)),
le32_to_cpu(con->in_msg->hdr.src.name.num),
- le32_to_cpu(con->in_msg->hdr.type),
+ le32_to_cpu(con->in_msg->hdr.type),
ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
- le32_to_cpu(con->in_msg->hdr.front_len),
+ le32_to_cpu(con->in_msg->hdr.front_len),
le32_to_cpu(con->in_msg->hdr.data_len));
- msgr->dispatch(con->msgr->parent, con->in_msg); /* fixme: use a workqueue */
+ msgr->dispatch(con->msgr->parent, con->in_msg);
con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
goto more;
/*
* worker function when listener receives a connect
*/
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
static void try_accept(struct work_struct *work)
-#else
-static void try_accept(void *arg)
-#endif
{
struct ceph_connection *new_con = NULL;
struct ceph_messenger *msgr;
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
msgr = container_of(work, struct ceph_messenger, awork);
-#else
- msgr = arg;
-#endif
- dout(5, "Entered try_accept\n");
+ dout(5, "Entered try_accept\n");
/* initialize the msgr connection */
new_con = new_connection(msgr);
if (new_con == NULL) {
- derr(1, "malloc failure\n");
+ derr(1, "malloc failure\n");
goto done;
}
if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
- derr(1, "error accepting connection\n");
+ derr(1, "error accepting connection\n");
put_connection(new_con);
- goto done;
- }
+ goto done;
+ }
dout(5, "accepted connection \n");
new_con->in_tag = CEPH_MSGR_TAG_READY;
add_connection_accepting(msgr, new_con);
/*
- * hand off to worker threads ,should be able to write, we want to
+ * hand off to worker threads ,should be able to write, we want to
* try to write right away, we may have missed socket state change
*/
ceph_queue_write(new_con);
done:
- return;
+ return;
}
/*
*/
struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
{
- struct ceph_messenger *msgr;
+ struct ceph_messenger *msgr;
int ret = 0;
- msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
- if (msgr == NULL)
+ msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
+ if (msgr == NULL)
return ERR_PTR(-ENOMEM);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
INIT_WORK(&msgr->awork, try_accept);
-#else
- INIT_WORK(&msgr->awork, try_accept, msgr);
-#endif
spin_lock_init(&msgr->con_lock);
INIT_LIST_HEAD(&msgr->con_all);
INIT_LIST_HEAD(&msgr->con_accepting);
- INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC); /* we insert under spinlock */
+ INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC);
/* pick listening address */
if (myaddr) {
msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */
}
msgr->inst.addr.ipaddr.sin_family = AF_INET;
-
+
/* create listening socket */
ret = ceph_tcp_listen(msgr);
if (ret < 0) {
kfree(msgr);
return ERR_PTR(ret);
}
- if (myaddr)
+ if (myaddr)
msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
dout(1, "create %p listening on %x:%d\n", msgr,
- ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
+ ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
ntohs(msgr->inst.addr.ipaddr.sin_port));
return msgr;
}
spin_lock(&msgr->con_lock);
while (!list_empty(&msgr->con_all)) {
dout(1, "con_all isn't empty\n");
- con = list_entry(msgr->con_all.next, struct ceph_connection, list_all);
+ con = list_entry(msgr->con_all.next, struct ceph_connection,
+ list_all);
__remove_connection(msgr, con);
}
spin_unlock(&msgr->con_lock);
/*
* mark a peer down. drop any open connection.
*/
-void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+void ceph_messenger_mark_down(struct ceph_messenger *msgr,
+ struct ceph_entity_addr *addr)
{
struct ceph_connection *con;
con = __get_connection(msgr, addr);
if (con) {
dout(10, "mark_down dropping %p\n", con);
- set_bit(CLOSED, &con->state); /* in case there is queued work */
+ set_bit(CLOSED, &con->state); /* in case there's queued work */
__remove_connection(msgr, con);
put_connection(con);
}
*
* will take+drop msgr, then connection locks.
*/
-int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout)
+int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
+ unsigned long timeout)
{
struct ceph_connection *con, *newcon;
int ret = 0;
-
+
/* set source */
msg->hdr.src = msgr->inst;
con = __get_connection(msgr, &msg->hdr.dst.addr);
if (con) {
put_connection(newcon);
- dout(10, "ceph_msg_send (lost race and) had connection %p to peer %u.%u.%u.%u:%u\n", con,
+ dout(10, "ceph_msg_send (lost race and) had connection "
+ "%p to peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));
} else {
con = newcon;
con->peer_addr = msg->hdr.dst.addr;
con->peer_name = msg->hdr.dst.name;
__add_connection(msgr, con);
- dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con,
+ dout(5, "ceph_msg_send new connection %p to peer "
+ "%u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));
}
} else {
- dout(10, "ceph_msg_send had connection %p to peer %u.%u.%u.%u:%u\n", con,
+ dout(10, "ceph_msg_send had connection %p to peer "
+ "%u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));
}
spin_unlock(&msgr->con_lock);
con->delay = timeout;
- dout(10, "ceph_msg_send delay = %lu\n", con->delay);
+ dout(10, "ceph_msg_send delay = %lu\n", con->delay);
/* queue */
spin_lock(&con->out_queue_lock);
msg->hdr.seq = cpu_to_le64(++con->out_seq);
dout(1, "----- %p to %s%d %d=%s len %d+%d -----\n", msg,
- ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
+ ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
le32_to_cpu(msg->hdr.dst.name.num),
le32_to_cpu(msg->hdr.type),
ceph_msg_type_name(le32_to_cpu(msg->hdr.type)),
- le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.data_len));
- dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg,
+ dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg,
le64_to_cpu(msg->hdr.seq),
- ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
+ ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
le32_to_cpu(msg->hdr.dst.name.num), con);
ceph_msg_get(msg);
list_add_tail(&msg->list_head, &con->out_queue);
spin_unlock(&con->out_queue_lock);
-
- if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+
+ if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
ceph_queue_write(con);
put_connection(con);
}
-/*
+/*
* construct a new message with given type, size
*/
-struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages)
+struct ceph_msg *ceph_msg_new(int type, int front_len,
+ int page_len, int page_off, struct page **pages)
{
struct ceph_msg *m;
if (atomic_dec_and_test(&m->nref)) {
dout(30, "ceph_msg_put last one on %p\n", m);
BUG_ON(!list_empty(&m->list_head));
- if (m->front.iov_base)
+ if (m->front.iov_base)
kfree(m->front.iov_base);
kfree(m);
}