From: Haomai Wang Date: Wed, 27 May 2015 16:47:57 +0000 (+0800) Subject: AsyncMessenger: Add perf counter for each async worker X-Git-Tag: v9.0.2~23^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=19f681f776bb11c8270bd5ca9f2465263b95a1d1;p=ceph.git AsyncMessenger: Add perf counter for each async worker Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 8fcf4858ed15..146424dfe150 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -172,8 +172,8 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } } -AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c) - : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0), +AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p) + : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0), open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL), @@ -192,6 +192,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente // double recv_max_prefetch see "read_until" recv_buf = new char[2*recv_max_prefetch]; state_buffer = new char[4096]; + logger->inc(l_msgr_created_connections); } AsyncConnection::~AsyncConnection() @@ -875,6 +876,8 @@ void AsyncConnection::process() } else { center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message))); } + logger->inc(l_msgr_recv_messages); + logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); break; } @@ -1944,6 +1947,10 @@ int AsyncConnection::send_message(Message *m) return 0; } + // we don't want to consider local message here, it's too lightweight which + // may disturb users + logger->inc(l_msgr_send_messages); + bufferlist bl; Mutex::Locker l(write_lock); m->set_seq(out_seq.inc()); @@ -2206,7 +2213,6 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl) << " data=" << header.data_len << " off " << header.data_off << dendl; - // Now that we have all the crcs calculated, handle the // digital signature for the message, if the AsyncConnection has session // security set up. Some session security options do not @@ -2266,6 +2272,7 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl) old_footer.flags = footer.flags; bl.append((char*)&old_footer, sizeof(old_footer)); } + logger->inc(l_msgr_send_bytes, bl.length()); } int AsyncConnection::write_message(Message *m, bufferlist& bl) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index a707135ddccb..bf9df82863cb 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -25,6 +25,7 @@ using namespace std; #include "auth/AuthSessionHandler.h" #include "common/Mutex.h" +#include "common/perf_counters.h" #include "include/buffer.h" #include "msg/Connection.h" #include "msg/Messenger.h" @@ -113,7 +114,7 @@ class AsyncConnection : public Connection { } public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c); + AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p); ~AsyncConnection(); ostream& _conn_prefix(std::ostream *_dout); @@ -215,6 +216,7 @@ class AsyncConnection : public Connection { } AsyncMessenger *async_msgr; + PerfCounters *logger; int global_seq; __u32 connect_seq, peer_global_seq; atomic_t out_seq; @@ -307,6 +309,9 @@ class AsyncConnection : public Connection { local_deliver_handler.reset(); wakeup_handler.reset(); } + PerfCounters *get_perf_counter() { + return logger; + } }; /* AsyncConnection */ typedef boost::intrusive_ptr AsyncConnectionRef; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 10e5a5aff089..c9086957269d 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -334,6 +334,7 @@ WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false), else lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl; } + } WorkerPool::~WorkerPool() @@ -390,7 +391,8 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, { ceph_spin_init(&global_seq_lock); cct->lookup_or_create_singleton_object(pool, WorkerPool::name); - local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center); + Worker *w = pool->get_worker(); + local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter()); init_local_connection(); } @@ -516,7 +518,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter()); conn->accept(sd); accepting_conns.insert(conn); lock.Unlock(); @@ -533,10 +535,11 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int // create connection Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter()); conn->connect(addr, type); assert(!conns.count(addr)); conns[addr] = conn; + w->get_perf_counter()->inc(l_msgr_active_connections); return conn; } @@ -663,6 +666,7 @@ void AsyncMessenger::mark_down_all() AsyncConnectionRef p = it->second; ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl; conns.erase(it); + p->get_perf_counter()->dec(l_msgr_active_connections); p->stop(); } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index a5e34b0d09e0..a61485bb44f1 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -41,6 +41,18 @@ using namespace std; class AsyncMessenger; class WorkerPool; +enum { + l_msgr_first = 94000, + l_msgr_recv_messages, + l_msgr_send_messages, + l_msgr_recv_bytes, + l_msgr_send_bytes, + l_msgr_created_connections, + l_msgr_active_connections, + l_msgr_last, +}; + + class Worker : public Thread { static const uint64_t InitEventNumber = 5000; static const uint64_t EventMaxWaitUs = 30000000; @@ -48,15 +60,37 @@ class Worker : public Thread { WorkerPool *pool; bool done; int id; + PerfCounters *perf_logger; public: EventCenter center; Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), center(c) { + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) { center.init(InitEventNumber); + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_active_connections", "Active connection number"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_created_connections", "Created connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } } void *entry(); void stop(); + PerfCounters *get_perf_counter() { return perf_logger; } }; /** @@ -217,7 +251,7 @@ public: Connection *create_anon_connection() { Mutex::Locker l(lock); Worker *w = pool->get_worker(); - return new AsyncConnection(cct, this, &w->center); + return new AsyncConnection(cct, this, &w->center, w->get_perf_counter()); } /** @@ -350,6 +384,7 @@ private: Mutex::Locker l(deleted_lock); if (deleted_conns.count(p->second)) { deleted_conns.erase(p->second); + p->second->get_perf_counter()->dec(l_msgr_active_connections); conns.erase(p); return NULL; } @@ -396,6 +431,7 @@ public: } } conns[conn->peer_addr] = conn; + conn->get_perf_counter()->inc(l_msgr_active_connections); accepting_conns.erase(conn); return 0; }