From 49cebc00d964dff61d93eac71b01d4ea169e0673 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 16 Jan 2008 17:52:13 -0800 Subject: [PATCH] mark_down. some cleanup. --- src/kernel/messenger.c | 51 +++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 405a1ce49ad85..4ccfe9ceff44f 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -173,9 +173,8 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect unsigned long key; dout(20, "__remove_connection %p from %p\n", con, msgr); - dout(10, "list_all next %p prev %p\n", con->list_all.next, con->list_all.prev); if (list_empty(&con->list_all)) { - dout(20, "__remove_connect not registered\n"); + dout(20, "__remove_connection %p not registered\n", con); return; } list_del_init(&con->list_all); @@ -615,7 +614,7 @@ done: } out: - dout(30, "try_write done\n"); + dout(30, "try_write done on %p\n", con); put_connection(con); return; } @@ -934,8 +933,8 @@ void try_read(struct work_struct *work) struct ceph_connection *con; struct ceph_messenger *msgr; - dout(20, "Entering try_read\n"); con = container_of(work, struct ceph_connection, rwork); + dout(20, "try_read start on %p\n", con); msgr = con->msgr; retry: @@ -1016,7 +1015,7 @@ done: } out: - dout(20, "Exited try_read\n"); + dout(20, "try_read done on %p\n", con); put_connection(con); return; } @@ -1103,7 +1102,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) /* kill off connections */ spin_lock(&msgr->con_lock); while (!list_empty(&msgr->con_all)) { - dout(1, "con_all isn't empty\n"); + dout(1, "con_all isn't empty\n"); con = list_entry(msgr->con_all.next, struct ceph_connection, list_all); __remove_connection(msgr, con); } @@ -1115,13 +1114,25 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) kfree(msgr); } +/* + * mark a peer down. drop any open connection. + */ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr) { + struct ceph_connection *con; + dout(1, "mark_down peer %x:%d\n", ntohl(addr->ipaddr.sin_addr.s_addr), ntohs(addr->ipaddr.sin_port)); - dout(1, "mark_down --- IMPLEMENT ME\n"); + spin_lock(&msgr->con_lock); + con = __get_connection(msgr, addr); + if (con) { + dout(10, "mark_down dropping %p\n", con); + set_bit(CLOSED, &con->state); /* in case there is queued work */ + __remove_connection(msgr, con); + } + spin_unlock(&msgr->con_lock); } @@ -1130,8 +1141,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad * * will take+drop msgr, then connection locks. */ -int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, - unsigned long timeout) +int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout) { struct ceph_connection *con; int ret = 0; @@ -1159,6 +1169,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, spin_unlock(&msgr->con_lock); con->delay = timeout; dout(10, "ceph_msg_send delay = %lu\n", con->delay); + /* queue */ spin_lock(&con->out_queue_lock); msg->hdr.seq = ++con->out_seq; @@ -1167,7 +1178,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, ceph_msg_get(msg); list_add_tail(&msg->list_head, &con->out_queue); spin_unlock(&con->out_queue_lock); - + if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) ceph_queue_write(con); @@ -1177,7 +1188,9 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, } - +/* + * construct a new message with given type, size + */ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages) { struct ceph_msg *m; @@ -1204,19 +1217,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of /* pages */ m->nr_pages = calc_pages_for(page_len, page_off); m->pages = pages; - /* - if (m->nr_pages) { - int i; - kzalloc(m->nr_pages*sizeof(*m->pages), GFP_KERNEL); - for (i=0; inr_pages; i++) { - m->pages[i] = alloc_page(GFP_KERNEL); - if (m->pages[i] == NULL) - goto out2; - } - } else { - m->pages = 0; - } - */ INIT_LIST_HEAD(&m->list_head); return m; @@ -1232,9 +1232,8 @@ void ceph_msg_put(struct ceph_msg *m) if (atomic_dec_and_test(&m->nref)) { dout(30, "ceph_msg_put last one on %p\n", m); BUG_ON(!list_empty(&m->list_head)); - if (m->front.iov_base) { + if (m->front.iov_base) kfree(m->front.iov_base); - } kfree(m); } } -- 2.39.5