]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Add perf counter for each async worker
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 27 May 2015 16:47:57 +0000 (00:47 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 30 May 2015 14:29:55 +0000 (22:29 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 8fcf4858ed15e44514081d24b30d17a1014bd4e0..146424dfe15097aa29b9f644e70d82d278e5aa24 100644 (file)
@@ -172,8 +172,8 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
   }
 }
 
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
-  : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
+AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
+  : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
     out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
     sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
     open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
@@ -192,6 +192,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
   // double recv_max_prefetch see "read_until"
   recv_buf = new char[2*recv_max_prefetch];
   state_buffer = new char[4096];
+  logger->inc(l_msgr_created_connections);
 }
 
 AsyncConnection::~AsyncConnection()
@@ -875,6 +876,8 @@ void AsyncConnection::process()
           } else {
             center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
           }
+          logger->inc(l_msgr_recv_messages);
+          logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
 
           break;
         }
@@ -1944,6 +1947,10 @@ int AsyncConnection::send_message(Message *m)
    return 0;
   }
 
+  // we don't want to consider local message here, it's too lightweight which
+  // may disturb users
+  logger->inc(l_msgr_send_messages);
+
   bufferlist bl;
   Mutex::Locker l(write_lock);
   m->set_seq(out_seq.inc());
@@ -2206,7 +2213,6 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
                              << " data=" << header.data_len
                              << " off " << header.data_off << dendl;
 
-
   // Now that we have all the crcs calculated, handle the
   // digital signature for the message, if the AsyncConnection has session
   // security set up.  Some session security options do not
@@ -2266,6 +2272,7 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
     old_footer.flags = footer.flags;
     bl.append((char*)&old_footer, sizeof(old_footer));
   }
+  logger->inc(l_msgr_send_bytes, bl.length());
 }
 
 int AsyncConnection::write_message(Message *m, bufferlist& bl)
index a707135ddccb7bc4c0c0ab629adde9e8df9a1f8c..bf9df82863cbd63efd5f8e87e0771bd8f2fccb18 100644 (file)
@@ -25,6 +25,7 @@ using namespace std;
 
 #include "auth/AuthSessionHandler.h"
 #include "common/Mutex.h"
+#include "common/perf_counters.h"
 #include "include/buffer.h"
 #include "msg/Connection.h"
 #include "msg/Messenger.h"
@@ -113,7 +114,7 @@ class AsyncConnection : public Connection {
   }
 
  public:
-  AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
+  AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
   ~AsyncConnection();
 
   ostream& _conn_prefix(std::ostream *_dout);
@@ -215,6 +216,7 @@ class AsyncConnection : public Connection {
   }
 
   AsyncMessenger *async_msgr;
+  PerfCounters *logger;
   int global_seq;
   __u32 connect_seq, peer_global_seq;
   atomic_t out_seq;
@@ -307,6 +309,9 @@ class AsyncConnection : public Connection {
     local_deliver_handler.reset();
     wakeup_handler.reset();
   }
+  PerfCounters *get_perf_counter() {
+    return logger;
+  }
 }; /* AsyncConnection */
 
 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
index 10e5a5aff089d3264c7b760cbee35c0c52ab1d4c..c9086957269d9930e14d731ea2832a9144225b3d 100644 (file)
@@ -334,6 +334,7 @@ WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
     else
       lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
   }
+
 }
 
 WorkerPool::~WorkerPool()
@@ -390,7 +391,8 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
 {
   ceph_spin_init(&global_seq_lock);
   cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
-  local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
+  Worker *w = pool->get_worker();
+  local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
   init_local_connection();
 }
 
@@ -516,7 +518,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd)
 {
   lock.Lock();
   Worker *w = pool->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
   conn->accept(sd);
   accepting_conns.insert(conn);
   lock.Unlock();
@@ -533,10 +535,11 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
 
   // create connection
   Worker *w = pool->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
   conn->connect(addr, type);
   assert(!conns.count(addr));
   conns[addr] = conn;
+  w->get_perf_counter()->inc(l_msgr_active_connections);
 
   return conn;
 }
@@ -663,6 +666,7 @@ void AsyncMessenger::mark_down_all()
     AsyncConnectionRef p = it->second;
     ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
     conns.erase(it);
+    p->get_perf_counter()->dec(l_msgr_active_connections);
     p->stop();
   }
 
index a5e34b0d09e027ab1cdf4249767153bf6274fa9c..a61485bb44f183e1b49073d0a12d03832d597824 100644 (file)
@@ -41,6 +41,18 @@ using namespace std;
 class AsyncMessenger;
 class WorkerPool;
 
+enum {
+  l_msgr_first = 94000,
+  l_msgr_recv_messages,
+  l_msgr_send_messages,
+  l_msgr_recv_bytes,
+  l_msgr_send_bytes,
+  l_msgr_created_connections,
+  l_msgr_active_connections,
+  l_msgr_last,
+};
+
+
 class Worker : public Thread {
   static const uint64_t InitEventNumber = 5000;
   static const uint64_t EventMaxWaitUs = 30000000;
@@ -48,15 +60,37 @@ class Worker : public Thread {
   WorkerPool *pool;
   bool done;
   int id;
+  PerfCounters *perf_logger;
 
  public:
   EventCenter center;
   Worker(CephContext *c, WorkerPool *p, int i)
-    : cct(c), pool(p), done(false), id(i), center(c) {
+    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) {
     center.init(InitEventNumber);
+    char name[128];
+    sprintf(name, "AsyncMessenger::Worker-%d", id);
+    // initialize perf_logger
+    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
+
+    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
+    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
+    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_created_connections, "msgr_active_connections", "Active connection number");
+    plb.add_u64_counter(l_msgr_active_connections, "msgr_created_connections", "Created connection number");
+
+    perf_logger = plb.create_perf_counters();
+    cct->get_perfcounters_collection()->add(perf_logger);
+  }
+  ~Worker() {
+    if (perf_logger) {
+      cct->get_perfcounters_collection()->remove(perf_logger);
+      delete perf_logger;
+    }
   }
   void *entry();
   void stop();
+  PerfCounters *get_perf_counter() { return perf_logger; }
 };
 
 /**
@@ -217,7 +251,7 @@ public:
   Connection *create_anon_connection() {
     Mutex::Locker l(lock);
     Worker *w = pool->get_worker();
-    return new AsyncConnection(cct, this, &w->center);
+    return new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
   }
 
   /**
@@ -350,6 +384,7 @@ private:
     Mutex::Locker l(deleted_lock);
     if (deleted_conns.count(p->second)) {
       deleted_conns.erase(p->second);
+      p->second->get_perf_counter()->dec(l_msgr_active_connections);
       conns.erase(p);
       return NULL;
     }
@@ -396,6 +431,7 @@ public:
       }
     }
     conns[conn->peer_addr] = conn;
+    conn->get_perf_counter()->inc(l_msgr_active_connections);
     accepting_conns.erase(conn);
     return 0;
   }