From: Sage Weil Date: Tue, 1 Apr 2008 22:13:45 +0000 (-0700) Subject: kmsgr: some formatting and style cleanup, and removed some legacy support X-Git-Tag: v0.2~229^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5d315111240460f14f2b4f591c877ddab531cc98;p=ceph.git kmsgr: some formatting and style cleanup, and removed some legacy support --- diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 991bc158952..1dcaa09d57e 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -10,7 +10,7 @@ int ceph_debug_msgr = 50; #define DOUT_VAR ceph_debug_msgr -#define DOUT_PREFIX "msgr: " +#define DOUT_PREFIX "msgr: " #include "super.h" /* static tag bytes */ @@ -21,15 +21,9 @@ static char tag_wait = CEPH_MSGR_TAG_WAIT; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) static void try_read(struct work_struct *); static void try_write(struct work_struct *); static void try_accept(struct work_struct *); -#else -static void try_read(void *); -static void try_write(void *); -static void try_accept(void *); -#endif @@ -37,14 +31,14 @@ static void try_accept(void *); * connections */ -/* +/* * create a new connection. initial state is NEW. */ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) { struct ceph_connection *con; con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL); - if (con == NULL) + if (con == NULL) return NULL; con->msgr = msgr; @@ -59,13 +53,8 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) INIT_WORK(&con->rwork, try_read); INIT_DELAYED_WORK(&con->swork, try_write); -#else - INIT_WORK(&con->rwork, try_read, con); - INIT_WORK(&con->swork, try_write, con); -#endif return con; } @@ -77,7 +66,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) * table. in the rare event that the trivial hash collides, we just * traverse the (short) list. */ -static unsigned long hash_addr(struct ceph_entity_addr *addr) +static unsigned long hash_addr(struct ceph_entity_addr *addr) { unsigned long key; key = *(__u32*)&addr->ipaddr.sin_addr.s_addr; @@ -85,10 +74,11 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr) return key; } -/* +/* * get an existing connection, if any, for given addr */ -static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) +static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, + struct ceph_entity_addr *addr) { struct ceph_connection *con = NULL; struct list_head *head, *p; @@ -96,7 +86,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, str /* existing? */ head = radix_tree_lookup(&msgr->con_open, key); - if (head == NULL) + if (head == NULL) goto out; con = list_entry(head, struct ceph_connection, list_bucket); if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) { @@ -117,10 +107,10 @@ out: -/* +/* * drop a reference */ -static void put_connection(struct ceph_connection *con) +static void put_connection(struct ceph_connection *con) { dout(20, "put_connection nref = %d\n", atomic_read(&con->nref)); if (atomic_dec_and_test(&con->nref)) { @@ -131,10 +121,11 @@ static void put_connection(struct ceph_connection *con) } } -/* +/* * add to connections tree */ -static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con) +static void __add_connection(struct ceph_messenger *msgr, + struct ceph_connection *con) { struct list_head *head; unsigned long key = hash_addr(&con->peer_addr); @@ -151,16 +142,19 @@ static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection head = radix_tree_lookup(&msgr->con_open, key); if (head) { - dout(20, "add_connection %p in existing bucket %lu head %p\n", con, key, head); + dout(20, "add_connection %p in existing bucket %lu head %p\n", + con, key, head); list_add(&con->list_bucket, head); } else { - dout(20, "add_connection %p in new bucket %lu head %p\n", con, key, &con->list_bucket); + dout(20, "add_connection %p in new bucket %lu head %p\n", con, + key, &con->list_bucket); INIT_LIST_HEAD(&con->list_bucket); /* empty */ radix_tree_insert(&msgr->con_open, key, &con->list_bucket); } } -static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con) +static void add_connection_accepting(struct ceph_messenger *msgr, + struct ceph_connection *con) { atomic_inc(&con->nref); spin_lock(&msgr->con_lock); @@ -173,7 +167,8 @@ static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_co * remove connection from all list. * also, from con_open radix tree, if it should have been there */ -static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con) +static void __remove_connection(struct ceph_messenger *msgr, + struct ceph_connection *con) { unsigned long key; void **slot, *val; @@ -190,25 +185,20 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect key = hash_addr(&con->peer_addr); if (list_empty(&con->list_bucket)) { /* last one */ - dout(20, "__remove_connection %p and removing bucket %lu\n", con, key); + dout(20, "__remove_connection %p and bucket %lu\n", + con, key); radix_tree_delete(&msgr->con_open, key); } else { slot = radix_tree_lookup_slot(&msgr->con_open, key); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) val = radix_tree_deref_slot(slot); -#else - val = *slot; -#endif - dout(20, "__remove_connection %p from bucket %lu head %p\n", con, key, val); + dout(20, "__remove_connection %p from bucket %lu " + "head %p\n", con, key, val); if (val == &con->list_bucket) { - dout(20, "__remove_connection adjusting bucket ptr" - " for %lu to next item, %p\n", key, + dout(20, "__remove_connection adjusting bucket" + " for %lu to next item, %p\n", key, con->list_bucket.next); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) - radix_tree_replace_slot(slot, con->list_bucket.next); -#else - *slot = con->list_bucket.next; -#endif + radix_tree_replace_slot(slot, + con->list_bucket.next); } list_del(&con->list_bucket); } @@ -216,7 +206,8 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect put_connection(con); } -static void remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con) +static void remove_connection(struct ceph_messenger *msgr, + struct ceph_connection *con) { spin_lock(&msgr->con_lock); __remove_connection(msgr, con); @@ -231,11 +222,7 @@ void ceph_queue_write(struct ceph_connection *con) { dout(40, "ceph_queue_write %p\n", con); atomic_inc(&con->nref); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) if (!queue_work(send_wq, &con->swork.work)) { -#else - if (!queue_work(send_wq, &con->swork)) { -#endif dout(40, "ceph_queue_write %p - already queued\n", con); put_connection(con); } @@ -271,13 +258,13 @@ void ceph_queue_read(struct ceph_connection *con) */ static void ceph_send_fault(struct ceph_connection *con) { - derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n", + derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n", con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); /* PW if never get here remove */ if (test_bit(WAIT, &con->state)) { - derr(30, "ceph_send_fault received socket close during WAIT state\n"); + derr(30, "ceph_send_fault socket close during WAIT state\n"); return; } @@ -288,7 +275,7 @@ static void ceph_send_fault(struct ceph_connection *con) con->sock = NULL; set_bit(NEW, &con->state); - /* If there are no messages in the queue, place the connection + /* If there are no messages in the queue, place the connection * in a STANDBY state otherwise retry with delay */ if (list_empty(&con->out_queue)) { dout(10, "setting STANDBY bit\n"); @@ -338,7 +325,7 @@ static int write_partial_kvec(struct ceph_connection *con) int ret; while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, + ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes); if (ret <= 0) goto out; con->out_kvec_bytes -= ret; @@ -359,20 +346,22 @@ static int write_partial_kvec(struct ceph_connection *con) con->out_kvec_left = 0; ret = 1; out: - dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, + dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, con->out_kvec_left, con->out_kvec_bytes, ret); return ret; /* done! */ } -static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg) +static int write_partial_msg_pages(struct ceph_connection *con, + struct ceph_msg *msg) { struct kvec kv; int ret; unsigned data_len = le32_to_cpu(msg->hdr.data_len); while (con->out_msg_pos.page < con->out_msg->nr_pages) { - kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos; - kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), + kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + + con->out_msg_pos.page_pos; + kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), (int)(data_len - con->out_msg_pos.data_pos)); ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len); if (ret < 0) return ret; @@ -382,7 +371,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg if (ret == kv.iov_len) { con->out_msg_pos.page_pos = 0; con->out_msg_pos.page++; - } + } } /* done */ @@ -396,7 +385,8 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg */ static void prepare_write_message(struct ceph_connection *con) { - struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head); + struct ceph_msg *m = list_entry(con->out_queue.next, + struct ceph_msg, list_head); /* move to sending/sent list */ list_del(&m->list_head); @@ -404,7 +394,7 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg = m; /* FIXME: do we want to take a reference here? */ /* encode header */ - dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n", + dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n", m, le64_to_cpu(m->hdr.seq), le32_to_cpu(m->hdr.type), le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len)); BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); @@ -427,7 +417,7 @@ static void prepare_write_message(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } -/* +/* * prepare an ack for send */ static void prepare_write_ack(struct ceph_connection *con) @@ -445,7 +435,8 @@ static void prepare_write_ack(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } -static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con) +static void prepare_write_connect(struct ceph_messenger *msgr, + struct ceph_connection *con) { con->out_kvec[0].iov_base = &msgr->inst.addr; con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); @@ -458,7 +449,8 @@ static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_conne set_bit(WRITE_PENDING, &con->state); } -static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con) +static void prepare_write_accept_announce(struct ceph_messenger *msgr, + struct ceph_connection *con) { con->out_kvec[0].iov_base = &msgr->inst.addr; con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); @@ -494,22 +486,14 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag) /* * worker function when socket is writeable */ -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) static void try_write(struct work_struct *work) -#else -static void try_write(void *arg) -#endif { - struct ceph_connection *con = -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) - container_of(work, struct ceph_connection, swork.work); -#else - arg; -#endif + struct ceph_connection *con = + container_of(work, struct ceph_connection, swork.work); struct ceph_messenger *msgr = con->msgr; int ret = 1; - dout(30, "try_write start %p state %lu nref %d\n", con, con->state, + dout(30, "try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); if (test_bit(CLOSED, &con->state)) { @@ -530,7 +514,8 @@ more: con->connect_seq++; prepare_write_connect(msgr, con); set_bit(CONNECTING, &con->state); - dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state); + dout(5, "try_write initiating connect on %p new state %lu\n", + con, con->state); ret = ceph_tcp_connect(con); if (ret < 0) { derr(1, "try_write tcp connect error %d\n", ret); @@ -545,7 +530,7 @@ more: if (ret == 0) goto done; if (ret < 0) { - dout(30, "try_write write_partial_kvec returned error %d\n", ret); + dout(30, "try_write write_partial_kvec err %d\n", ret); goto done; } } @@ -553,22 +538,24 @@ more: /* check if connect handshake finished.. if not requeue and return.. */ /* if (!test_bit(OPEN, &con->state)) { - dout(5, "try_write state = %lu, need to requeue and exit\n", con->state); - goto done; - } + dout(5, "try_write state = %lu, need to requeue and exit\n", + con->state); + goto done; + } */ - + /* msg pages? */ if (con->out_msg) { ret = write_partial_msg_pages(con, con->out_msg); - if (ret == 0) + if (ret == 0) goto done; if (ret < 0) { - dout(30, "try_write write_partial_msg_pages returned error %d\n", ret); + dout(30, "try_write write_partial_msg_pages err %d\n", + ret); goto done; } } - + /* anything else pending? */ spin_lock(&con->out_queue_lock); if (con->in_seq > con->in_seq_acked) { @@ -592,7 +579,7 @@ done: return; } -/* +/* * prepare to read a message */ static int prepare_read_message(struct ceph_connection *con) @@ -603,9 +590,9 @@ static int prepare_read_message(struct ceph_connection *con) con->in_base_pos = 0; con->in_msg = ceph_msg_new(0, 0, 0, 0, 0); if (IS_ERR(con->in_msg)) { - /* TBD: we don't check for error in caller, handle error here? */ + /* TBD: we don't check for error in caller, handle it here? */ err = PTR_ERR(con->in_msg); - con->in_msg = 0; + con->in_msg = 0; derr(1, "kmalloc failure on incoming message %d\n", err); return err; } @@ -622,13 +609,14 @@ static int read_message_partial(struct ceph_connection *con) int ret; int want, left; unsigned front_len, data_len, data_off; - + dout(20, "read_message_partial con %p msg %p\n", con, m); /* header */ while (con->in_base_pos < sizeof(struct ceph_msg_header)) { left = sizeof(struct ceph_msg_header) - con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos, left); + ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos, + left); if (ret <= 0) return ret; con->in_base_pos += ret; if (con->in_base_pos == sizeof(struct ceph_msg_header)) @@ -644,7 +632,8 @@ static int read_message_partial(struct ceph_connection *con) return -ENOMEM; } left = front_len - m->front.iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left); + ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base + + m->front.iov_len, left); if (ret <= 0) return ret; m->front.iov_len += ret; } @@ -652,7 +641,7 @@ static int read_message_partial(struct ceph_connection *con) /* (page) data */ data_len = le32_to_cpu(m->hdr.data_len); data_off = le32_to_cpu(m->hdr.data_off); - if (data_len == 0) + if (data_len == 0) goto done; if (m->nr_pages == 0) { con->in_msg_pos.page = 0; @@ -664,8 +653,8 @@ static int read_message_partial(struct ceph_connection *con) BUG_ON(!con->msgr->prepare_pages); ret = con->msgr->prepare_pages(con->msgr->parent, m, want); if (ret < 0) { - dout(10, "prepare_pages failed, skipping+discarding message\n"); - con->in_base_pos = -data_len; /* ignore rest of message */ + dout(10, "prepare_pages failed, skipping payload\n"); + con->in_base_pos = -data_len; /* skip payload */ ceph_msg_put(con->in_msg); con->in_msg = 0; con->in_tag = CEPH_MSGR_TAG_READY; @@ -674,16 +663,15 @@ static int read_message_partial(struct ceph_connection *con) BUG_ON(m->nr_pages != want); } /* - * FIXME: we should discard the data payload if ret + * FIXME: we should discard the data payload if ret */ } while (con->in_msg_pos.data_pos < data_len) { left = min((int)(data_len - con->in_msg_pos.data_pos), (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); - /*dout(10, "data_pos = %d, data_len = %d, page_pos=%d left = %d\n", - con->in_msg_pos.data_pos, m->hdr.data_len, con->in_msg_pos.page_pos, left);*/ p = kmap(m->pages[con->in_msg_pos.page]); - ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left); + ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, + left); if (ret <= 0) return ret; con->in_msg_pos.data_pos += ret; con->in_msg_pos.page_pos += ret; @@ -700,11 +688,12 @@ done: if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { /* * in practice, we learn our ip from the first incoming mon - * message, before anyone else knows we exist, so this is + * message, before anyone else knows we exist, so this is * safe. */ con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr; - dout(10, "read_message_partial learned my addr is %u.%u.%u.%u:%u\n", + dout(10, "read_message_partial learned my addr is " + "%u.%u.%u.%u:%u\n", IPQUADPORT(con->msgr->inst.addr.ipaddr)); } @@ -712,7 +701,7 @@ done: } -/* +/* * prepare to read an ack */ static void prepare_read_ack(struct ceph_connection *con) @@ -728,7 +717,9 @@ static int read_ack_partial(struct ceph_connection *con) { while (con->in_base_pos < sizeof(con->in_partial_ack)) { int left = sizeof(con->in_partial_ack) - con->in_base_pos; - int ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left); + int ret = ceph_tcp_recvmsg(con->sock, + (char*)&con->in_partial_ack + + con->in_base_pos, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -742,9 +733,9 @@ static void process_ack(struct ceph_connection *con, __u32 ack) while (!list_empty(&con->out_sent)) { m = list_entry(con->out_sent.next, struct ceph_msg, list_head); seq = le64_to_cpu(m->hdr.seq); - if (seq > ack) + if (seq > ack) break; - dout(5, "got ack for seq %llu type %d at %p\n", seq, + dout(5, "got ack for seq %llu type %d at %p\n", seq, le32_to_cpu(m->hdr.type), m); list_del(&m->list_head); ceph_msg_put(m); @@ -752,20 +743,22 @@ static void process_ack(struct ceph_connection *con, __u32 ack) } -/* +/* * read portion of connect-side handshake on a new connection */ static int read_connect_partial(struct ceph_connection *con) { int ret, to; - dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos); + dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos); /* actual_peer_addr */ to = sizeof(con->actual_peer_addr); while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left); + ret = ceph_tcp_recvmsg(con->sock, + (char*)&con->actual_peer_addr + have, + left); if (ret <= 0) goto out; con->in_base_pos += ret; } @@ -784,16 +777,19 @@ static int read_connect_partial(struct ceph_connection *con) if (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = sizeof(con->in_connect_seq) - left; - ret = ceph_tcp_recvmsg(con->sock, - (char*)&con->in_connect_seq + have, left); + ret = ceph_tcp_recvmsg(con->sock, + (char*)&con->in_connect_seq + + have, left); if (ret <= 0) goto out; con->in_base_pos += ret; - } + } } ret = 1; out: - dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret); - dout(20, "read_connect_partial peer in connect_seq = %u\n", le32_to_cpu(con->in_connect_seq)); + dout(20, "read_connect_partial %p end at %d ret %d\n", con, + con->in_base_pos, ret); + dout(20, "read_connect_partial peer in connect_seq = %u\n", + le32_to_cpu(con->in_connect_seq)); return ret; /* done */ } @@ -821,7 +817,8 @@ static void process_connect(struct ceph_connection *con) { dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag); if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) { - derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n", + derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, " + "got %u.%u.%u.%u:%u/%d, wtf\n", IPQUADPORT(con->peer_addr.ipaddr), con->peer_addr.nonce, IPQUADPORT(con->actual_peer_addr.ipaddr), @@ -832,18 +829,19 @@ static void process_connect(struct ceph_connection *con) } switch (con->in_tag) { case CEPH_MSGR_TAG_RESETSESSION: - dout(10, "process_connect got session RESET peers in_connect_seq %u\n", + dout(10, "process_connect got RESET peer seq %u\n", le32_to_cpu(con->in_connect_seq)); reset_connection(con); prepare_write_connect(con->msgr, con); con->msgr->peer_reset(con->msgr->parent, &con->peer_name); break; case CEPH_MSGR_TAG_RETRY: - dout(10, - "process_connect got session RETRY connect_seq = %u, in_connect_seq = %u\n", - le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq)); + dout(10, + "process_connect got RETRY my seq = %u, peer_seq = %u\n", + le32_to_cpu(con->out_connect_seq), + le32_to_cpu(con->in_connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect_seq); - prepare_write_connect(con->msgr, con); + prepare_write_connect(con->msgr, con); break; case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); @@ -858,7 +856,7 @@ static void process_connect(struct ceph_connection *con) set_bit(OPEN, &con->state); break; default: - derr(1, "process_connect protocol error, try connecting again in a bit\n"); + derr(1, "process_connect protocol error, will retry\n"); con->delay = 10 * HZ; /* maybe use default.. */ ceph_send_fault(con); } @@ -881,7 +879,8 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; - ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_addr + have, left); + ret = ceph_tcp_recvmsg(con->sock, + (char*)&con->peer_addr + have, left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -891,7 +890,9 @@ static int read_accept_partial(struct ceph_connection *con) while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = sizeof(con->peer_addr) - left; - ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_connect_seq + have, left); + ret = ceph_tcp_recvmsg(con->sock, + (char*)&con->in_connect_seq + have, + left); if (ret <= 0) return ret; con->in_base_pos += ret; } @@ -900,10 +901,12 @@ static int read_accept_partial(struct ceph_connection *con) /* * replace another connection - * (old and new should be for the _same_ peer, + * (old and new should be for the _same_ peer, * and thus in the same pos in the radix tree) */ -static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new) +static void __replace_connection(struct ceph_messenger *msgr, + struct ceph_connection *old, + struct ceph_connection *new) { clear_bit(OPEN, &old->state); @@ -951,7 +954,7 @@ static void process_accept(struct ceph_connection *con) reset_connection(existing); /* replace connection */ __replace_connection(msgr, existing, con); - con->msgr->peer_reset(con->msgr->parent, + con->msgr->peer_reset(con->msgr->parent, &con->peer_name); } else { /* old attempt or peer didn't get the READY */ @@ -960,23 +963,23 @@ static void process_accept(struct ceph_connection *con) prepare_write_accept_retry(con, &tag_retry); } } else if (peer_cseq == existing->connect_seq && - (test_bit(CONNECTING, &existing->state) || + (test_bit(CONNECTING, &existing->state) || test_bit(WAIT, &existing->state))) { /* connection race */ dout(20, "process_accept connection race state = %lu\n", con->state); - if (ceph_entity_addr_equal(&msgr->inst.addr, + if (ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) { /* incoming connection wins.. */ /* replace existing with new connection */ - __replace_connection(msgr, existing, con); + __replace_connection(msgr, existing, con); } else { /* our existing outgoing connection wins.. - tell peer to wait for our outgoing + tell peer to wait for our outgoing connection to go through */ prepare_write_accept_reply(con, &tag_wait); } - } else if (existing->connect_seq == 0 && + } else if (existing->connect_seq == 0 && (peer_cseq > existing->connect_seq)) { /* we reset and already reconnecting */ prepare_write_accept_reply(con, &tag_reset); @@ -986,12 +989,12 @@ static void process_accept(struct ceph_connection *con) } put_connection(existing); } else if (peer_cseq > 0) { - dout(20, "process_accept no existing connection, connection reset\n"); + dout(20, "process_accept no existing connection, we reset\n"); prepare_write_accept_reply(con, &tag_reset); } else { - dout(20, "process_accept no existing connection, connection now OPEN\n"); + dout(20, "process_accept no existing connection, opening\n"); __add_connection(msgr, con); - set_bit(OPEN, &con->state); + set_bit(OPEN, &con->state); con->connect_seq = peer_cseq + 1; prepare_write_accept_reply(con, &tag_ready); } @@ -1004,21 +1007,13 @@ static void process_accept(struct ceph_connection *con) /* * worker function when data is available on the socket */ -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) static void try_read(struct work_struct *work) -#else -static void try_read(void *arg) -#endif { int ret = -1; struct ceph_connection *con; struct ceph_messenger *msgr; -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) con = container_of(work, struct ceph_connection, rwork); -#else - con = arg; -#endif dout(20, "try_read start on %p\n", con); msgr = con->msgr; @@ -1052,7 +1047,7 @@ more: if (con->in_base_pos < 0) { /* skipping + discarding content */ static char buf[1024]; - ret = ceph_tcp_recvmsg(con->sock, buf, + ret = ceph_tcp_recvmsg(con->sock, buf, min(1024, -con->in_base_pos)); if (ret <= 0) goto done; con->in_base_pos += ret; @@ -1062,7 +1057,7 @@ more: ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); if (ret <= 0) goto done; dout(30, "try_read got tag %d\n", (int)con->in_tag); - if (con->in_tag == CEPH_MSGR_TAG_MSG) + if (con->in_tag == CEPH_MSGR_TAG_MSG) prepare_read_message(con); else if (con->in_tag == CEPH_MSGR_TAG_ACK) prepare_read_ack(con); @@ -1077,14 +1072,15 @@ more: ret = read_message_partial(con); if (ret <= 0) goto done; - dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", con->in_msg, - ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)), + dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", + con->in_msg, + ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)), le32_to_cpu(con->in_msg->hdr.src.name.num), - le32_to_cpu(con->in_msg->hdr.type), + le32_to_cpu(con->in_msg->hdr.type), ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)), - le32_to_cpu(con->in_msg->hdr.front_len), + le32_to_cpu(con->in_msg->hdr.front_len), le32_to_cpu(con->in_msg->hdr.data_len)); - msgr->dispatch(con->msgr->parent, con->in_msg); /* fixme: use a workqueue */ + msgr->dispatch(con->msgr->parent, con->in_msg); con->in_msg = 0; con->in_tag = CEPH_MSGR_TAG_READY; goto more; @@ -1117,34 +1113,26 @@ out: /* * worker function when listener receives a connect */ -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) static void try_accept(struct work_struct *work) -#else -static void try_accept(void *arg) -#endif { struct ceph_connection *new_con = NULL; struct ceph_messenger *msgr; -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) msgr = container_of(work, struct ceph_messenger, awork); -#else - msgr = arg; -#endif - dout(5, "Entered try_accept\n"); + dout(5, "Entered try_accept\n"); /* initialize the msgr connection */ new_con = new_connection(msgr); if (new_con == NULL) { - derr(1, "malloc failure\n"); + derr(1, "malloc failure\n"); goto done; } if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { - derr(1, "error accepting connection\n"); + derr(1, "error accepting connection\n"); put_connection(new_con); - goto done; - } + goto done; + } dout(5, "accepted connection \n"); new_con->in_tag = CEPH_MSGR_TAG_READY; @@ -1155,12 +1143,12 @@ static void try_accept(void *arg) add_connection_accepting(msgr, new_con); /* - * hand off to worker threads ,should be able to write, we want to + * hand off to worker threads ,should be able to write, we want to * try to write right away, we may have missed socket state change */ ceph_queue_write(new_con); done: - return; + return; } /* @@ -1168,21 +1156,17 @@ done: */ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) { - struct ceph_messenger *msgr; + struct ceph_messenger *msgr; int ret = 0; - msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); - if (msgr == NULL) + msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); + if (msgr == NULL) return ERR_PTR(-ENOMEM); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) INIT_WORK(&msgr->awork, try_accept); -#else - INIT_WORK(&msgr->awork, try_accept, msgr); -#endif spin_lock_init(&msgr->con_lock); INIT_LIST_HEAD(&msgr->con_all); INIT_LIST_HEAD(&msgr->con_accepting); - INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC); /* we insert under spinlock */ + INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC); /* pick listening address */ if (myaddr) { @@ -1193,18 +1177,18 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */ } msgr->inst.addr.ipaddr.sin_family = AF_INET; - + /* create listening socket */ ret = ceph_tcp_listen(msgr); if (ret < 0) { kfree(msgr); return ERR_PTR(ret); } - if (myaddr) + if (myaddr) msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr; dout(1, "create %p listening on %x:%d\n", msgr, - ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), + ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), ntohs(msgr->inst.addr.ipaddr.sin_port)); return msgr; } @@ -1219,7 +1203,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) spin_lock(&msgr->con_lock); while (!list_empty(&msgr->con_all)) { dout(1, "con_all isn't empty\n"); - con = list_entry(msgr->con_all.next, struct ceph_connection, list_all); + con = list_entry(msgr->con_all.next, struct ceph_connection, + list_all); __remove_connection(msgr, con); } spin_unlock(&msgr->con_lock); @@ -1233,7 +1218,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) /* * mark a peer down. drop any open connection. */ -void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) +void ceph_messenger_mark_down(struct ceph_messenger *msgr, + struct ceph_entity_addr *addr) { struct ceph_connection *con; @@ -1244,7 +1230,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad con = __get_connection(msgr, addr); if (con) { dout(10, "mark_down dropping %p\n", con); - set_bit(CLOSED, &con->state); /* in case there is queued work */ + set_bit(CLOSED, &con->state); /* in case there's queued work */ __remove_connection(msgr, con); put_connection(con); } @@ -1257,11 +1243,12 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad * * will take+drop msgr, then connection locks. */ -int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout) +int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, + unsigned long timeout) { struct ceph_connection *con, *newcon; int ret = 0; - + /* set source */ msg->hdr.src = msgr->inst; @@ -1278,43 +1265,46 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo con = __get_connection(msgr, &msg->hdr.dst.addr); if (con) { put_connection(newcon); - dout(10, "ceph_msg_send (lost race and) had connection %p to peer %u.%u.%u.%u:%u\n", con, + dout(10, "ceph_msg_send (lost race and) had connection " + "%p to peer %u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); } else { con = newcon; con->peer_addr = msg->hdr.dst.addr; con->peer_name = msg->hdr.dst.name; __add_connection(msgr, con); - dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con, + dout(5, "ceph_msg_send new connection %p to peer " + "%u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); } } else { - dout(10, "ceph_msg_send had connection %p to peer %u.%u.%u.%u:%u\n", con, + dout(10, "ceph_msg_send had connection %p to peer " + "%u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); } spin_unlock(&msgr->con_lock); con->delay = timeout; - dout(10, "ceph_msg_send delay = %lu\n", con->delay); + dout(10, "ceph_msg_send delay = %lu\n", con->delay); /* queue */ spin_lock(&con->out_queue_lock); msg->hdr.seq = cpu_to_le64(++con->out_seq); dout(1, "----- %p to %s%d %d=%s len %d+%d -----\n", msg, - ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), + ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), le32_to_cpu(msg->hdr.dst.name.num), le32_to_cpu(msg->hdr.type), ceph_msg_type_name(le32_to_cpu(msg->hdr.type)), - le32_to_cpu(msg->hdr.front_len), + le32_to_cpu(msg->hdr.front_len), le32_to_cpu(msg->hdr.data_len)); - dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg, + dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg, le64_to_cpu(msg->hdr.seq), - ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), + ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), le32_to_cpu(msg->hdr.dst.name.num), con); ceph_msg_get(msg); list_add_tail(&msg->list_head, &con->out_queue); spin_unlock(&con->out_queue_lock); - - if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) + + if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) ceph_queue_write(con); put_connection(con); @@ -1323,10 +1313,11 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo } -/* +/* * construct a new message with given type, size */ -struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages) +struct ceph_msg *ceph_msg_new(int type, int front_len, + int page_len, int page_off, struct page **pages) { struct ceph_msg *m; @@ -1369,7 +1360,7 @@ void ceph_msg_put(struct ceph_msg *m) if (atomic_dec_and_test(&m->nref)) { dout(30, "ceph_msg_put last one on %p\n", m); BUG_ON(!list_empty(&m->list_head)); - if (m->front.iov_base) + if (m->front.iov_base) kfree(m->front.iov_base); kfree(m); } diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index fc0995f6301..c3d6b98ffcd 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -12,7 +12,8 @@ struct ceph_msg; typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m); typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn); -typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, int want); +typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, + int want); static __inline__ const char *ceph_name_type_str(int t) { switch (t) { @@ -37,7 +38,7 @@ struct ceph_messenger { spinlock_t con_lock; struct list_head con_all; /* all connections */ struct list_head con_accepting; /* doing handshake, or */ - struct radix_tree_root con_open; /* established. see get_connection() */ + struct radix_tree_root con_open; /* established */ }; struct ceph_msg { @@ -55,7 +56,7 @@ struct ceph_msg_pos { }; /* ceph connection fault delay defaults */ -#define BASE_DELAY_INTERVAL 1 +#define BASE_DELAY_INTERVAL 1 #define MAX_DELAY_INTERVAL (5U * 60 * HZ) /* ceph_connection state bit flags */ @@ -69,13 +70,13 @@ struct ceph_msg_pos { #define WAIT 7 /* wait for peer to connect */ #define CLOSED 8 /* we've closed the connection */ #define SOCK_CLOSE 9 /* socket state changed to close */ -#define STANDBY 10 /* standby, when socket state close, no message queued */ +#define STANDBY 10 /* standby, when socket state close, no messages */ struct ceph_connection { struct ceph_messenger *msgr; struct socket *sock; /* connection socket */ unsigned long state; /* connection state */ - + atomic_t nref; struct list_head list_all; /* msgr->con_all */ @@ -83,8 +84,8 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ - __u32 connect_seq; - __le32 in_connect_seq, out_connect_seq; + __u32 connect_seq; + __le32 in_connect_seq, out_connect_seq; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ @@ -94,7 +95,7 @@ struct ceph_connection { /* out queue */ spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ struct list_head out_queue; - struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */ + struct list_head out_sent; /* sending/sent but unacked */ __le32 out32; struct kvec out_kvec[4], @@ -105,36 +106,36 @@ struct ceph_connection { struct ceph_msg_pos out_msg_pos; /* partially read message contents */ - char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */ - int in_base_pos; /* for ack seq, or msg headers, or accept handshake */ - __u32 in_partial_ack; + char in_tag; + int in_base_pos; /* for ack seq, or msg headers, or handshake */ + __u32 in_partial_ack; struct ceph_msg *in_msg; struct ceph_msg_pos in_msg_pos; struct work_struct rwork; /* receive work */ -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) struct delayed_work swork; /* send work */ -#else - struct work_struct swork; /* send work */ -#endif - unsigned long delay; /* delay interval */ - unsigned int retries; /* temp track of retries */ + unsigned long delay; /* delay interval */ + unsigned int retries; /* temp track of retries */ }; -extern struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr); +extern struct ceph_messenger * +ceph_messenger_create(struct ceph_entity_addr *myaddr); extern void ceph_messenger_destroy(struct ceph_messenger *); -extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr); +extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, + struct ceph_entity_addr *addr); extern void ceph_queue_write(struct ceph_connection *con); extern void ceph_queue_read(struct ceph_connection *con); -extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages); +extern struct ceph_msg *ceph_msg_new(int type, int front_len, + int page_len, int page_off, + struct page **pages); static __inline__ void ceph_msg_get(struct ceph_msg *msg) { atomic_inc(&msg->nref); } extern void ceph_msg_put(struct ceph_msg *msg); -extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, +extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout); #endif