boost::intrusive::list_member_hook<>,
&Message::dispatch_q>> Queue;
+ ceph::mono_time queue_start;
protected:
CompletionHook* completion_hook = nullptr; // owned by Messenger
<< " Drop message " << m << dendl;
m->put();
} else {
+ m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_q[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
prepare_send_message(connection->get_features(), m, data);
}
+ if (m->queue_start != ceph::mono_time()) {
+ connection->logger->tinc(l_msgr_send_messages_queue_lat,
+ ceph::mono_clock::now() - m->queue_start);
+ }
+
r = write_message(m, data, more);
connection->write_lock.lock();
// trim sent list
static const int max_pending = 128;
int i = 0;
+ auto now = ceph::mono_clock::now();
Message *pending[max_pending];
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
<< dendl;
}
connection->write_lock.unlock();
+ connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
for (int k = 0; k < i; k++) {
pending[k]->put();
}
} else {
ldout(cct, 5) << __func__ << " enqueueing message m=" << m
<< " type=" << m->get_type() << " " << *m << dendl;
+ m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(
out_queue_entry_t{is_prepared, m});
static const int max_pending = 128;
int i = 0;
Message *pending[max_pending];
+ auto now = ceph::mono_clock::now();
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
Message *m = sent.front();
<< dendl;
}
connection->write_lock.unlock();
+ connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
for (int k = 0; k < i; k++) {
pending[k]->put();
}
prepare_send_message(connection->get_features(), out_entry.m);
}
+ if (out_entry.m->queue_start != ceph::mono_time()) {
+ connection->logger->tinc(l_msgr_send_messages_queue_lat,
+ ceph::mono_clock::now() -
+ out_entry.m->queue_start);
+ }
+
r = write_message(out_entry.m, more);
connection->write_lock.lock();
l_msgr_running_recv_time,
l_msgr_running_fast_dispatch_time,
+ l_msgr_send_messages_queue_lat,
+ l_msgr_handle_ack_lat,
+
l_msgr_last,
};
plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
+ plb.add_time_avg(l_msgr_send_messages_queue_lat, "msgr_send_messages_queue_lat", "Network sent messages lat");
+ plb.add_time_avg(l_msgr_handle_ack_lat, "msgr_handle_ack_lat", "Connection handle ack lat");
+
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
}