else if (tag == CEPH_MSGR_TAG_MSG) {
dout(20) << "reader got MSG" << dendl;
- Message *m = read_message();
+ Message *m = 0;
+ int r = read_message(&m);
pipe_lock.Lock();
if (!m) {
- derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
continue;
}
}
-Message *SimpleMessenger::Pipe::read_message()
+int SimpleMessenger::Pipe::read_message(Message **pm)
{
+ int ret = -1;
// envelope
//dout(10) << "receiver.read_message from sd " << sd << dendl;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
- return 0;
+ return -1;
header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
- return 0;
+ return -1;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
// verify header crc
if (header_crc != header.crc) {
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
- return 0;
+ return -1;
}
- dout(10) << "getting message bytes now, currently using "
- << messenger->message_throttler.get_current() << "/"
- << messenger->message_throttler.get_max() << dendl;
- uint64_t message_size = header.front_len + header.middle_len
- + header.data_len;
+
+ bufferlist front, middle, data;
+ int front_len, middle_len;
+ unsigned data_len, data_off;
+ int aborted;
+ Message *message;
+
+ uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
- if (policy.throttler)
+ if (policy.throttler) {
+ dout(10) << "reader wants " << message_size << " from policy throttler "
+ << policy.throttler->get_current() << "/"
+ << policy.throttler->get_max() << dendl;
policy.throttler->get(message_size);
+ }
// throttle total bytes waiting for dispatch. do this _after_ the
// policy throttle, as this one does not deadlock (unless dispatch
// blocks indefinitely, which it shouldn't). in contrast, the
// policy throttle carries for the lifetime of the message.
+ dout(10) << "reader wants " << message_size << " from dispatch throttler "
+ << messenger->message_throttler.get_current() << "/"
+ << messenger->message_throttler.get_max() << dendl;
messenger->message_throttler.get(message_size);
}
// read front
- bufferlist front;
- int front_len = header.front_len;
+ front_len = header.front_len;
if (front_len) {
bufferptr bp = buffer::create(front_len);
if (tcp_read( sd, bp.c_str(), front_len ) < 0)
- return 0;
+ goto out_dethrottle;
front.push_back(bp);
dout(20) << "reader got front " << front.length() << dendl;
}
// read middle
- bufferlist middle;
- int middle_len = header.middle_len;
+ middle_len = header.middle_len;
if (middle_len) {
bufferptr bp = buffer::create(middle_len);
if (tcp_read( sd, bp.c_str(), middle_len ) < 0)
- return 0;
+ goto out_dethrottle;
middle.push_back(bp);
dout(20) << "reader got middle " << middle.length() << dendl;
}
// read data
- bufferlist data;
- unsigned data_len = le32_to_cpu(header.data_len);
- unsigned data_off = le32_to_cpu(header.data_off);
+ data_len = le32_to_cpu(header.data_len);
+ data_off = le32_to_cpu(header.data_off);
if (data_len) {
int left = data_len;
if (data_off & ~PAGE_MASK) {
(unsigned)left);
bufferptr bp = buffer::create(head);
if (tcp_read( sd, bp.c_str(), head ) < 0)
- return 0;
+ goto out_dethrottle;
data.push_back(bp);
left -= head;
dout(20) << "reader got data head " << head << dendl;
if (middle > 0) {
bufferptr bp = buffer::create_page_aligned(middle);
if (tcp_read( sd, bp.c_str(), middle ) < 0)
- return 0;
+ goto out_dethrottle;
data.push_back(bp);
left -= middle;
dout(20) << "reader got data page-aligned middle " << middle << dendl;
if (left) {
bufferptr bp = buffer::create(left);
if (tcp_read( sd, bp.c_str(), left ) < 0)
- return 0;
+ goto out_dethrottle;
data.push_back(bp);
dout(20) << "reader got data tail " << left << dendl;
}
// footer
if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
- return 0;
+ goto out_dethrottle;
- int aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
+ aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
dout(10) << "aborted = " << aborted << dendl;
if (aborted) {
dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message.. ABORTED" << dendl;
// MEH FIXME
- Message *m = new MGenericMessage(CEPH_MSG_PING);
+ *pm = new MGenericMessage(CEPH_MSG_PING);
header.type = CEPH_MSG_PING;
- m->set_header(header);
- return m;
+ (*pm)->set_header(header);
+ ret = 0;
+ goto out_dethrottle;
}
dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message" << dendl;
- Message *message = decode_message(header, footer, front, middle, data);
+ message = decode_message(header, footer, front, middle, data);
message->set_throttler(policy.throttler);
- return message;
+ *pm = message;
+ return 0;
+
+ out_dethrottle:
+ // release bytes reserved from the throttlers on failure
+ if (message_size) {
+ if (policy.throttler) {
+ dout(10) << "reader releasing " << message_size << " from policy throttler "
+ << policy.throttler->get_current() << "/"
+ << policy.throttler->get_max() << dendl;
+ policy.throttler->put(message_size);
+ }
+
+ // throttle total bytes waiting for dispatch. do this _after_ the
+ // policy throttle, as this one does not deadlock (unless dispatch
+ // blocks indefinitely, which it shouldn't). in contrast, the
+ // policy throttle carries for the lifetime of the message.
+ dout(10) << "reader releasing " << message_size << " from dispatch throttler "
+ << messenger->message_throttler.get_current() << "/"
+ << messenger->message_throttler.get_max() << dendl;
+ messenger->message_throttler.put(message_size);
+ }
+ return ret;
}