bufferlist middle; // "middle" unaligned blob
bufferlist data; // data payload (page-alignment will be preserved where possible)
+ /* recv_stamp is set when the Messenger starts reading the
+ * Message off the wire */
utime_t recv_stamp;
+ /* dispatch_stamp is set when the Messenger starts calling dispatch() on
+ * its endpoints */
+ utime_t dispatch_stamp;
+ /* throttle_wait is the amount of time spent waiting on throttlers between
+ * message receipt and message dispatch*/
+ utime_t throttle_wait;
+
Connection *connection;
// release our size in bytes back to this throttler when our payload
off_t get_data_len() { return data.length(); }
void set_recv_stamp(utime_t t) { recv_stamp = t; }
- utime_t get_recv_stamp() { return recv_stamp; }
+ const utime_t& get_recv_stamp() { return recv_stamp; }
+ void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
+ const utime_t& get_dispatch_stamp() { return dispatch_stamp; }
+ void set_throttle_wait(utime_t t) { throttle_wait = t; }
+ const utime_t& get_throttle_wait() { return throttle_wait; }
void calc_header_crc() {
header.crc = ceph_crc32c_le(0, (unsigned char*)&header,
// dispatch incoming messages
void ms_deliver_dispatch(Message *m) {
+ m->set_dispatch_stamp(ceph_clock_now(cct));
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
p++)
unsigned data_len, data_off;
int aborted;
Message *message;
+ utime_t recv_stamp = ceph_clock_now(msgr->cct);
+ bool waited_on_throttle = false;
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
<< policy.throttler->get_current() << "/"
<< policy.throttler->get_max() << dendl;
- policy.throttler->get(message_size);
+ waited_on_throttle = policy.throttler->get(message_size);
}
// throttle total bytes waiting for dispatch. do this _after_ the
ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
<< msgr->dispatch_throttler.get_current() << "/"
<< msgr->dispatch_throttler.get_max() << dendl;
- msgr->dispatch_throttler.get(message_size);
+ waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
+ }
+
+ utime_t throttle_wait;
+ if (waited_on_throttle) {
+ throttle_wait = ceph_clock_now(msgr->cct) - recv_stamp;
}
// read front
// by messages entering the dispatch queue through other paths.
message->set_dispatch_throttle_size(message_size);
+ message->set_recv_stamp(recv_stamp);
+ message->set_throttle_wait(throttle_wait);
+
*pm = message;
return 0;
void queue_received(Message *m, int priority);
void queue_received(Message *m) {
- m->set_recv_stamp(ceph_clock_now(msgr->cct));
-
// this is just to make sure that a changeset is working
// properly; if you start using the refcounting more and have
// multiple people hanging on to a message, ditch the assert!