From 0001a07354f17c433090ed2106dc8024cbe33efd Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 28 Jun 2016 22:33:25 +0800 Subject: [PATCH] Revert "msg/AsyncMessenger: move Worker class to cc file" This reverts commit 23d4488e376de8f1a5c369365d91021ee8c9a324. --- src/msg/async/AsyncMessenger.cc | 59 --------------------------------- src/msg/async/AsyncMessenger.h | 55 ++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 61 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 7b859a0660e43..1a5e22aebd2fd 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -50,52 +50,6 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { } -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber, i); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } -}; - /******************* * Processor */ @@ -875,15 +829,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) return 0; } -void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { - Mutex::Locker l(deleted_lock); - deleted_conns.insert(conn); - - if (deleted_conns.size() >= ReapDeadConnectionThreshold) { - local_worker->center.dispatch_event_external(reap_handler); - } -} - void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of @@ -928,7 +873,3 @@ int AsyncMessenger::reap_dead() return num; } - -void AsyncMessenger::release_worker(EventCenter* c) { - pool->release_worker(c); -} diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8e65c14ff55e6..9ac8a1a5c66b8 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -56,7 +56,51 @@ enum { }; -class Worker; +class Worker : public Thread { + static const uint64_t InitEventNumber = 5000; + static const uint64_t EventMaxWaitUs = 30000000; + CephContext *cct; + WorkerPool *pool; + bool done; + int id; + PerfCounters *perf_logger; + + public: + EventCenter center; + std::atomic_uint references; + Worker(CephContext *c, WorkerPool *p, int i) + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { + center.init(InitEventNumber, i); + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + void *entry(); + void stop(); + PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } +}; /** * If the Messenger binds to a specific address, the Processor runs @@ -444,7 +488,14 @@ public: * * See "deleted_conns" */ - void unregister_conn(AsyncConnectionRef conn); + void unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + deleted_conns.insert(conn); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->center.dispatch_event_external(reap_handler); + } + } /** * Reap dead connection from `deleted_conns` -- 2.39.5