}
-class Worker : public Thread {
- static const uint64_t InitEventNumber = 5000;
- static const uint64_t EventMaxWaitUs = 30000000;
- CephContext *cct;
- WorkerPool *pool;
- bool done;
- int id;
- PerfCounters *perf_logger;
-
- public:
- EventCenter center;
- std::atomic_uint references;
- Worker(CephContext *c, WorkerPool *p, int i)
- : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
- center.init(InitEventNumber, i);
- char name[128];
- sprintf(name, "AsyncMessenger::Worker-%d", id);
- // initialize perf_logger
- PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
-
- plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
- plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
- plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
- plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
- plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
- plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
- plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
-
- perf_logger = plb.create_perf_counters();
- cct->get_perfcounters_collection()->add(perf_logger);
- }
- ~Worker() {
- if (perf_logger) {
- cct->get_perfcounters_collection()->remove(perf_logger);
- delete perf_logger;
- }
- }
- void *entry();
- void stop();
- PerfCounters *get_perf_counter() { return perf_logger; }
- void release_worker() {
- int oldref = references.fetch_sub(1);
- assert(oldref > 0);
- }
-};
-
/*******************
* Processor
*/
return 0;
}
-void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) {
- Mutex::Locker l(deleted_lock);
- deleted_conns.insert(conn);
-
- if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
- local_worker->center.dispatch_event_external(reap_handler);
- }
-}
-
void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
return num;
}
-
-void AsyncMessenger::release_worker(EventCenter* c) {
- pool->release_worker(c);
-}
};
-class Worker;
+class Worker : public Thread {
+ static const uint64_t InitEventNumber = 5000;
+ static const uint64_t EventMaxWaitUs = 30000000;
+ CephContext *cct;
+ WorkerPool *pool;
+ bool done;
+ int id;
+ PerfCounters *perf_logger;
+
+ public:
+ EventCenter center;
+ std::atomic_uint references;
+ Worker(CephContext *c, WorkerPool *p, int i)
+ : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
+ center.init(InitEventNumber, i);
+ char name[128];
+ sprintf(name, "AsyncMessenger::Worker-%d", id);
+ // initialize perf_logger
+ PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
+
+ plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
+ plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
+ plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
+ plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
+ plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
+ plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
+ plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
+ }
+ ~Worker() {
+ if (perf_logger) {
+ cct->get_perfcounters_collection()->remove(perf_logger);
+ delete perf_logger;
+ }
+ }
+ void *entry();
+ void stop();
+ PerfCounters *get_perf_counter() { return perf_logger; }
+ void release_worker() {
+ int oldref = references.fetch_sub(1);
+ assert(oldref > 0);
+ }
+};
/**
* If the Messenger binds to a specific address, the Processor runs
*
* See "deleted_conns"
*/
- void unregister_conn(AsyncConnectionRef conn);
+ void unregister_conn(AsyncConnectionRef conn) {
+ Mutex::Locker l(deleted_lock);
+ deleted_conns.insert(conn);
+
+ if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
+ local_worker->center.dispatch_event_external(reap_handler);
+ }
+ }
/**
* Reap dead connection from `deleted_conns`