From 1610990c1b013887ef032543feb89d2962c8d1c6 Mon Sep 17 00:00:00 2001 From: Patience Warnick Date: Mon, 21 Jan 2008 17:29:20 -0800 Subject: [PATCH] Fixed up fault handling some, and some other bugs.. Merge branch 'rados' of ssh://patience@ceph.newdream.net/home/sage/ceph.newdream.net/git/ceph into rados Conflicts: src/kernel/messenger.c --- src/kernel/messenger.c | 171 ++++++++++++----------------------------- 1 file changed, 50 insertions(+), 121 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 4ccfe9ceff44f..fcd0dcbb1a6c2 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -22,7 +22,6 @@ static char tag_ack = CEPH_MSGR_TAG_ACK; static void try_read(struct work_struct *); static void try_write(struct work_struct *); -static void try_write_later(struct work_struct *); static void try_accept(struct work_struct *); @@ -54,8 +53,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr) INIT_LIST_HEAD(&con->out_sent); INIT_WORK(&con->rwork, try_read); - INIT_WORK(&con->swork, try_write); - INIT_DELAYED_WORK(&con->delaywork, try_write_later); + INIT_DELAYED_WORK(&con->swork, try_write); return con; } @@ -119,7 +117,8 @@ static void put_connection(struct ceph_connection *con) dout(20, "put_connection nref = %d\n", atomic_read(&con->nref)); if (atomic_dec_and_test(&con->nref)) { dout(20, "put_connection destroying %p\n", con); - sock_release(con->sock); + if (con->sock) + sock_release(con->sock); kfree(con); con = NULL; } @@ -224,25 +223,19 @@ static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connec */ void ceph_queue_write(struct ceph_connection *con) { - if (test_and_set_bit(WRITEABLE, &con->state) != 0) { - dout(40, "ceph_queue_write %p - already WRITEABLE\n", con); - return; - } - if (test_bit(WRITING, &con->state)) { - dout(40, "ceph_queue_write %p - still WRITING, queuing nothing\n", con); - return; - } dout(40, "ceph_queue_write %p\n", con); atomic_inc(&con->nref); - queue_work(send_wq, &con->swork); + if (!queue_work(send_wq, &con->swork.work)) { + dout(40, "ceph_queue_write %p - already queued\n", con); + put_connection(con); + } } void ceph_queue_delayed_write(struct ceph_connection *con) { dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay); atomic_inc(&con->nref); - set_bit(WAITING, &con->state); - if (!queue_delayed_work(send_wq, &con->delaywork, con->delay)) { + if (!queue_delayed_work(send_wq, &con->swork, con->delay)) { dout(40, "ceph_queue_delayed_write %p - already queued\n", con); put_connection(con); } @@ -266,82 +259,28 @@ void ceph_queue_read(struct ceph_connection *con) * failure case * A retry mechanism is used with exponential backoff */ -static void ceph_send_fault(struct ceph_connection *con, int error) +static void ceph_send_fault(struct ceph_connection *con) { - derr(1, "ceph_send_fault %p state %lu error %d to peer %x:%d\n", - con, con->state, error, + derr(1, "ceph_send_fault %p state %lu to peer %x:%d\n", + con, con->state, ntohl(con->peer_addr.ipaddr.sin_addr.s_addr), ntohs(con->peer_addr.ipaddr.sin_port)); - if (!con->delay) { - /* - * lossy channel. don't bother with any retry. - */ - if (error == -EAGAIN) { - derr(1, "ceph_send_fault lossy channel, ignoring EAGAIN\n"); - return; - } - dout(1, "ceph_send_fault lossy channel, giving up\n"); - remove_connection(con->msgr, con); - return; + if (!test_and_clear_bit(CONNECTING, &con->state)){ + derr(1, "CONNECTING bit not set\n"); + /* TBD: reset buffer correctly */ + /* reset buffer */ + spin_lock(&con->out_queue_lock); + list_splice_init(&con->out_sent, &con->out_queue); + spin_unlock(&con->out_queue_lock); + clear_bit(OPEN, &con->state); } + set_bit(NEW, &con->state); + clear_bit(CLOSED, &con->state); + sock_release(con->sock); + /* retry with delay */ + ceph_queue_delayed_write(con); - switch (error) { - case -ETIMEDOUT: - derr(10, "timed out to peer %x:%d\n", - ntohl(con->peer_addr.ipaddr.sin_addr.s_addr), - ntohs(con->peer_addr.ipaddr.sin_port)); - /* peer unreachable, TBD: need to research if should retry - * a few times first, then close socket and try reconnecting */ - case -EHOSTDOWN: - case -EHOSTUNREACH: - case -ENETUNREACH: - derr(10, "ceph_send_fault peer unreachable via route state = %lu\n", con->state); - /* TBD: reset buffer correctly */ - /* spin_lock(&con->out_queue_lock); - list_splice_init(&con->out_sent, &con->out_queue); - spin_unlock(&con->out_queue_lock); */ - /* retry with delay */ - ceph_queue_delayed_write(con); - break; - case -EAGAIN: - /* no space in socket buffer, ceph_write_space will - * handle requeueing */ - if (test_bit(OPEN, &con->state)) - return; - case -EPIPE: - /* TBD: may do something different here, may not be allowed */ - case -ECONNREFUSED: - case -ECONNRESET: - /* never connected socket. SOCK_DONE flag not set */ - case -ENOTCONN: - derr(10, "ceph_send_fault no connection state = %lu\n", con->state); - /* TBD: setup timeout here */ - if (!test_and_clear_bit(CONNECTING, &con->state)){ - derr(1, "CONNECTING bit not set\n"); - /* TBD: reset buffer correctly */ - /* reset buffer */ - spin_lock(&con->out_queue_lock); - list_splice_init(&con->out_sent, - &con->out_queue); - spin_unlock(&con->out_queue_lock); - clear_bit(OPEN, &con->state); - } - set_bit(NEW, &con->state); - clear_bit(CLOSED, &con->state); - sock_release(con->sock); - /* retry with delay */ - ceph_queue_delayed_write(con); - break; - case -EIO: - derr(10, "ceph_send_fault EIO set\n"); - /* shutdown or soft timeout */ - - default: - /* if we ever hit here ... */ - derr(10, "ceph_send_fault unrecognized error %d\n", - error); - } if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; else @@ -518,21 +457,14 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) /* * worker function when socket is writeable */ -static void _try_write(struct ceph_connection *con) +static void try_write(struct work_struct *work) { + struct ceph_connection *con = container_of(work, struct ceph_connection, swork.work); struct ceph_messenger *msgr = con->msgr; int ret = 1; dout(30, "try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); -retry: - clear_bit(WAITING, &con->state); - if (test_and_set_bit(WRITING, &con->state)) { - dout(20, "try_write already writing\n"); - goto out; - } - clear_bit(WRITEABLE, &con->state); - /* Peer initiated close, other end is waiting for us to close */ if (test_and_clear_bit(CLOSING, &con->state)) { dout(5, "try_write peer initiated close\n"); @@ -540,11 +472,27 @@ retry: remove_connection(msgr, con); goto done; } + if (test_bit(CLOSED, &con->state)) { dout(5, "try_write closed\n"); remove_connection(msgr, con); goto done; } + + if (con->sock && con->sock->sk->sk_state == TCP_CLOSE) { + dout(5, "try_write TCP_CLOSE received\n"); + if (!con->delay) { + dout(5, "try_write tcp_close delay = 0\n"); + remove_connection(msgr, con); + goto done; + } else { + dout(5, "try_write tcp_close delay != 0\n"); + ceph_send_fault(con); + /* PW hmm, keep getting tcp_close so need to drop through.. + * I want to play around with it a little more.. */ + /* goto done; */ + } + } more: dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes); @@ -556,9 +504,7 @@ more: ret = ceph_tcp_connect(con); dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state); if (ret < 0) { - /* fault */ - derr(1, "try_write connect error\n"); - ceph_send_fault(con, ret); + derr(1, "try_write connect error, maybe we need to cleanup?\n"); goto done; } } @@ -574,7 +520,8 @@ more: remove_connection(msgr, con); } if (ret < 0) { - ceph_send_fault(con, ret); + /* TBD: seems to always close on error, so remove this once we're sure */ + /* ceph_send_fault(con, ret); */ goto done; /* error */ } } @@ -585,7 +532,8 @@ more: if (ret == 0) goto done; if (ret < 0) { - ceph_send_fault(con, ret); + /* TBD: seems to always close on error, so remove this once we're sure */ + /* ceph_send_fault(con, ret); */ goto done; } } @@ -601,39 +549,19 @@ more: /* hmm, nothing to do! No more writes pending? */ dout(30, "try_write nothing else to write.\n"); spin_unlock(&con->out_queue_lock); + /* TBD PW remove from wait queue if still there.. and put_connection */ + if (con->delay > 1) con->delay = BASE_DELAY_INTERVAL; goto done; } spin_unlock(&con->out_queue_lock); goto more; done: - clear_bit(WRITING, &con->state); - if (test_bit(WRITEABLE, &con->state) && !test_bit(WAITING, &con->state)) { - dout(30, "try_write writeable flag set again, looping\n"); - goto retry; - } - -out: dout(30, "try_write done on %p\n", con); put_connection(con); return; } -static void try_write_later(struct work_struct *work) -{ - struct ceph_connection *con; - con = container_of(work, struct ceph_connection, delaywork.work); - _try_write(con); -} - -static void try_write(struct work_struct *work) -{ - struct ceph_connection *con; - con = container_of(work, struct ceph_connection, swork); - _try_write(con); -} - - /* * prepare to read a message */ @@ -1131,6 +1059,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad dout(10, "mark_down dropping %p\n", con); set_bit(CLOSED, &con->state); /* in case there is queued work */ __remove_connection(msgr, con); + put_connection(con); } spin_unlock(&msgr->con_lock); } -- 2.39.5