}
}
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
- : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
+AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
+ : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
state_buffer = new char[4096];
+ logger->inc(l_msgr_created_connections);
}
AsyncConnection::~AsyncConnection()
} else {
center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
}
+ logger->inc(l_msgr_recv_messages);
+ logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
break;
}
return 0;
}
+ // we don't want to consider local message here, it's too lightweight which
+ // may disturb users
+ logger->inc(l_msgr_send_messages);
+
bufferlist bl;
Mutex::Locker l(write_lock);
m->set_seq(out_seq.inc());
<< " data=" << header.data_len
<< " off " << header.data_off << dendl;
-
// Now that we have all the crcs calculated, handle the
// digital signature for the message, if the AsyncConnection has session
// security set up. Some session security options do not
old_footer.flags = footer.flags;
bl.append((char*)&old_footer, sizeof(old_footer));
}
+ logger->inc(l_msgr_send_bytes, bl.length());
}
int AsyncConnection::write_message(Message *m, bufferlist& bl)
#include "auth/AuthSessionHandler.h"
#include "common/Mutex.h"
+#include "common/perf_counters.h"
#include "include/buffer.h"
#include "msg/Connection.h"
#include "msg/Messenger.h"
}
public:
- AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
+ AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
~AsyncConnection();
ostream& _conn_prefix(std::ostream *_dout);
}
AsyncMessenger *async_msgr;
+ PerfCounters *logger;
int global_seq;
__u32 connect_seq, peer_global_seq;
atomic_t out_seq;
local_deliver_handler.reset();
wakeup_handler.reset();
}
+ PerfCounters *get_perf_counter() {
+ return logger;
+ }
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
else
lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
}
+
}
WorkerPool::~WorkerPool()
{
ceph_spin_init(&global_seq_lock);
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
- local_connection = new AsyncConnection(cct, this, &pool->get_worker()->center);
+ Worker *w = pool->get_worker();
+ local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
init_local_connection();
}
{
lock.Lock();
Worker *w = pool->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
conn->accept(sd);
accepting_conns.insert(conn);
lock.Unlock();
// create connection
Worker *w = pool->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
conn->connect(addr, type);
assert(!conns.count(addr));
conns[addr] = conn;
+ w->get_perf_counter()->inc(l_msgr_active_connections);
return conn;
}
AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " mark down " << it->first << " " << p << dendl;
conns.erase(it);
+ p->get_perf_counter()->dec(l_msgr_active_connections);
p->stop();
}
class AsyncMessenger;
class WorkerPool;
+enum {
+ l_msgr_first = 94000,
+ l_msgr_recv_messages,
+ l_msgr_send_messages,
+ l_msgr_recv_bytes,
+ l_msgr_send_bytes,
+ l_msgr_created_connections,
+ l_msgr_active_connections,
+ l_msgr_last,
+};
+
+
class Worker : public Thread {
static const uint64_t InitEventNumber = 5000;
static const uint64_t EventMaxWaitUs = 30000000;
WorkerPool *pool;
bool done;
int id;
+ PerfCounters *perf_logger;
public:
EventCenter center;
Worker(CephContext *c, WorkerPool *p, int i)
- : cct(c), pool(p), done(false), id(i), center(c) {
+ : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) {
center.init(InitEventNumber);
+ char name[128];
+ sprintf(name, "AsyncMessenger::Worker-%d", id);
+ // initialize perf_logger
+ PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
+
+ plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
+ plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
+ plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
+ plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
+ plb.add_u64_counter(l_msgr_created_connections, "msgr_active_connections", "Active connection number");
+ plb.add_u64_counter(l_msgr_active_connections, "msgr_created_connections", "Created connection number");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
+ }
+ ~Worker() {
+ if (perf_logger) {
+ cct->get_perfcounters_collection()->remove(perf_logger);
+ delete perf_logger;
+ }
}
void *entry();
void stop();
+ PerfCounters *get_perf_counter() { return perf_logger; }
};
/**
Connection *create_anon_connection() {
Mutex::Locker l(lock);
Worker *w = pool->get_worker();
- return new AsyncConnection(cct, this, &w->center);
+ return new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
}
/**
Mutex::Locker l(deleted_lock);
if (deleted_conns.count(p->second)) {
deleted_conns.erase(p->second);
+ p->second->get_perf_counter()->dec(l_msgr_active_connections);
conns.erase(p);
return NULL;
}
}
}
conns[conn->peer_addr] = conn;
+ conn->get_perf_counter()->inc(l_msgr_active_connections);
accepting_conns.erase(conn);
return 0;
}