From 7cedafd3e32b78daa5699cffd04f47d3f1204245 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 30 Jun 2010 12:08:18 -0700 Subject: [PATCH] msgr: release bytes reserved from throttlers in failure paths If we don't release those bytes, the throttler count eventually fills up with bytes we were going to read but didn't (due to socket errors, etc) until we can't read anything. Signed-off-by: Sage Weil --- src/msg/SimpleMessenger.cc | 93 +++++++++++++++++++++++++------------- src/msg/SimpleMessenger.h | 2 +- 2 files changed, 63 insertions(+), 32 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 937f1764f94fc..1c65cb621e10c 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1500,12 +1500,12 @@ void SimpleMessenger::Pipe::reader() else if (tag == CEPH_MSGR_TAG_MSG) { dout(20) << "reader got MSG" << dendl; - Message *m = read_message(); + Message *m = 0; + int r = read_message(&m); pipe_lock.Lock(); if (!m) { - derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(false, true); continue; } @@ -1701,8 +1701,9 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() } -Message *SimpleMessenger::Pipe::read_message() +int SimpleMessenger::Pipe::read_message(Message **pm) { + int ret = -1; // envelope //dout(10) << "receiver.read_message from sd " << sd << dendl; @@ -1712,12 +1713,12 @@ Message *SimpleMessenger::Pipe::read_message() if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) - return 0; + return -1; header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } else { ceph_msg_header_old oldheader; if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0) - return 0; + return -1; // this is fugly memcpy(&header, &oldheader, sizeof(header)); header.src = oldheader.src.name; @@ -1736,51 +1737,58 @@ Message *SimpleMessenger::Pipe::read_message() // verify header crc if (header_crc != header.crc) { dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; - return 0; + return -1; } - dout(10) << "getting message bytes now, currently using " - << messenger->message_throttler.get_current() << "/" - << messenger->message_throttler.get_max() << dendl; - uint64_t message_size = header.front_len + header.middle_len - + header.data_len; + + bufferlist front, middle, data; + int front_len, middle_len; + unsigned data_len, data_off; + int aborted; + Message *message; + + uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { - if (policy.throttler) + if (policy.throttler) { + dout(10) << "reader wants " << message_size << " from policy throttler " + << policy.throttler->get_current() << "/" + << policy.throttler->get_max() << dendl; policy.throttler->get(message_size); + } // throttle total bytes waiting for dispatch. do this _after_ the // policy throttle, as this one does not deadlock (unless dispatch // blocks indefinitely, which it shouldn't). in contrast, the // policy throttle carries for the lifetime of the message. + dout(10) << "reader wants " << message_size << " from dispatch throttler " + << messenger->message_throttler.get_current() << "/" + << messenger->message_throttler.get_max() << dendl; messenger->message_throttler.get(message_size); } // read front - bufferlist front; - int front_len = header.front_len; + front_len = header.front_len; if (front_len) { bufferptr bp = buffer::create(front_len); if (tcp_read( sd, bp.c_str(), front_len ) < 0) - return 0; + goto out_dethrottle; front.push_back(bp); dout(20) << "reader got front " << front.length() << dendl; } // read middle - bufferlist middle; - int middle_len = header.middle_len; + middle_len = header.middle_len; if (middle_len) { bufferptr bp = buffer::create(middle_len); if (tcp_read( sd, bp.c_str(), middle_len ) < 0) - return 0; + goto out_dethrottle; middle.push_back(bp); dout(20) << "reader got middle " << middle.length() << dendl; } // read data - bufferlist data; - unsigned data_len = le32_to_cpu(header.data_len); - unsigned data_off = le32_to_cpu(header.data_off); + data_len = le32_to_cpu(header.data_len); + data_off = le32_to_cpu(header.data_off); if (data_len) { int left = data_len; if (data_off & ~PAGE_MASK) { @@ -1789,7 +1797,7 @@ Message *SimpleMessenger::Pipe::read_message() (unsigned)left); bufferptr bp = buffer::create(head); if (tcp_read( sd, bp.c_str(), head ) < 0) - return 0; + goto out_dethrottle; data.push_back(bp); left -= head; dout(20) << "reader got data head " << head << dendl; @@ -1800,7 +1808,7 @@ Message *SimpleMessenger::Pipe::read_message() if (middle > 0) { bufferptr bp = buffer::create_page_aligned(middle); if (tcp_read( sd, bp.c_str(), middle ) < 0) - return 0; + goto out_dethrottle; data.push_back(bp); left -= middle; dout(20) << "reader got data page-aligned middle " << middle << dendl; @@ -1809,7 +1817,7 @@ Message *SimpleMessenger::Pipe::read_message() if (left) { bufferptr bp = buffer::create(left); if (tcp_read( sd, bp.c_str(), left ) < 0) - return 0; + goto out_dethrottle; data.push_back(bp); dout(20) << "reader got data tail " << left << dendl; } @@ -1817,25 +1825,48 @@ Message *SimpleMessenger::Pipe::read_message() // footer if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) - return 0; + goto out_dethrottle; - int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; + aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; dout(10) << "aborted = " << aborted << dendl; if (aborted) { dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message.. ABORTED" << dendl; // MEH FIXME - Message *m = new MGenericMessage(CEPH_MSG_PING); + *pm = new MGenericMessage(CEPH_MSG_PING); header.type = CEPH_MSG_PING; - m->set_header(header); - return m; + (*pm)->set_header(header); + ret = 0; + goto out_dethrottle; } dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; - Message *message = decode_message(header, footer, front, middle, data); + message = decode_message(header, footer, front, middle, data); message->set_throttler(policy.throttler); - return message; + *pm = message; + return 0; + + out_dethrottle: + // release bytes reserved from the throttlers on failure + if (message_size) { + if (policy.throttler) { + dout(10) << "reader releasing " << message_size << " from policy throttler " + << policy.throttler->get_current() << "/" + << policy.throttler->get_max() << dendl; + policy.throttler->put(message_size); + } + + // throttle total bytes waiting for dispatch. do this _after_ the + // policy throttle, as this one does not deadlock (unless dispatch + // blocks indefinitely, which it shouldn't). in contrast, the + // policy throttle carries for the lifetime of the message. + dout(10) << "reader releasing " << message_size << " from dispatch throttler " + << messenger->message_throttler.get_current() << "/" + << messenger->message_throttler.get_max() << dendl; + messenger->message_throttler.put(message_size); + } + return ret; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 023f16b6bc4ba..3edfd7ae9b3d0 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -149,7 +149,7 @@ private: void writer(); void unlock_maybe_reap(); - Message *read_message(); + int read_message(Message **pm); int write_message(Message *m); int do_sendmsg(int sd, struct msghdr *msg, int len, bool more=false); int write_ack(uint64_t s); -- 2.39.5