]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Revert "msg/AsyncMessenger: move Worker class to cc file"
authorHaomai Wang <haomai@xsky.com>
Tue, 28 Jun 2016 14:33:25 +0000 (22:33 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 04:14:29 +0000 (12:14 +0800)
This reverts commit 23d4488e376de8f1a5c369365d91021ee8c9a324.

src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 7b859a0660e43c59ab5395f05b5279168ae5876e..1a5e22aebd2fd2fdd898c0ae47543dae4321c42c 100644 (file)
@@ -50,52 +50,6 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
 }
 
 
-class Worker : public Thread {
-  static const uint64_t InitEventNumber = 5000;
-  static const uint64_t EventMaxWaitUs = 30000000;
-  CephContext *cct;
-  WorkerPool *pool;
-  bool done;
-  int id;
-  PerfCounters *perf_logger;
-
- public:
-  EventCenter center;
-  std::atomic_uint references;
-  Worker(CephContext *c, WorkerPool *p, int i)
-    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
-    center.init(InitEventNumber, i);
-    char name[128];
-    sprintf(name, "AsyncMessenger::Worker-%d", id);
-    // initialize perf_logger
-    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
-
-    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
-    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
-    plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
-    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
-    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
-    plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
-    plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
-
-    perf_logger = plb.create_perf_counters();
-    cct->get_perfcounters_collection()->add(perf_logger);
-  }
-  ~Worker() {
-    if (perf_logger) {
-      cct->get_perfcounters_collection()->remove(perf_logger);
-      delete perf_logger;
-    }
-  }
-  void *entry();
-  void stop();
-  PerfCounters *get_perf_counter() { return perf_logger; }
-  void release_worker() {
-    int oldref = references.fetch_sub(1);
-    assert(oldref > 0);
-  }
-};
-
 /*******************
  * Processor
  */
@@ -875,15 +829,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect)
   return 0;
 }
 
-void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) {
-  Mutex::Locker l(deleted_lock);
-  deleted_conns.insert(conn);
-
-  if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
-    local_worker->center.dispatch_event_external(reap_handler);
-  }
-}
-
 void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
 {
   // be careful here: multiple threads may block here, and readers of
@@ -928,7 +873,3 @@ int AsyncMessenger::reap_dead()
 
   return num;
 }
-
-void AsyncMessenger::release_worker(EventCenter* c) {
-  pool->release_worker(c);
-}
index 8e65c14ff55e623b72f468d621e72c31310d97d1..9ac8a1a5c66b86d7c63fd220c60f86ce4dc44eee 100644 (file)
@@ -56,7 +56,51 @@ enum {
 };
 
 
-class Worker;
+class Worker : public Thread {
+  static const uint64_t InitEventNumber = 5000;
+  static const uint64_t EventMaxWaitUs = 30000000;
+  CephContext *cct;
+  WorkerPool *pool;
+  bool done;
+  int id;
+  PerfCounters *perf_logger;
+
+ public:
+  EventCenter center;
+  std::atomic_uint references;
+  Worker(CephContext *c, WorkerPool *p, int i)
+    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
+    center.init(InitEventNumber, i);
+    char name[128];
+    sprintf(name, "AsyncMessenger::Worker-%d", id);
+    // initialize perf_logger
+    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
+
+    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
+    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
+    plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
+    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
+    plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
+
+    perf_logger = plb.create_perf_counters();
+    cct->get_perfcounters_collection()->add(perf_logger);
+  }
+  ~Worker() {
+    if (perf_logger) {
+      cct->get_perfcounters_collection()->remove(perf_logger);
+      delete perf_logger;
+    }
+  }
+  void *entry();
+  void stop();
+  PerfCounters *get_perf_counter() { return perf_logger; }
+  void release_worker() {
+    int oldref = references.fetch_sub(1);
+    assert(oldref > 0);
+  }
+};
 
 /**
  * If the Messenger binds to a specific address, the Processor runs
@@ -444,7 +488,14 @@ public:
    *
    * See "deleted_conns"
    */
-  void unregister_conn(AsyncConnectionRef conn);
+  void unregister_conn(AsyncConnectionRef conn) {
+    Mutex::Locker l(deleted_lock);
+    deleted_conns.insert(conn);
+
+    if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
+      local_worker->center.dispatch_event_external(reap_handler);
+    }
+  }
 
   /**
    * Reap dead connection from `deleted_conns`