]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
AsyncMessenger: remove extra release_worker path
authorHaomai Wang <haomai@xsky.com>
Fri, 27 May 2016 03:31:38 +0000 (11:31 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 04:14:29 +0000 (12:14 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index f002be603a163a013a58d9707834c62c4c49616f..35809c8c116b2e7f2af477f322fec2742b312549 100644 (file)
@@ -119,9 +119,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 }
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
-                                 EventCenter *c, PerfCounters *p)
+                                 Worker *w)
   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
-    logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
+    logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
     out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
     dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
@@ -130,7 +130,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     last_active(ceph::coarse_mono_clock::now()),
     inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
     got_bad_auth(false), authorizer(NULL), replacing(false),
-    is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
+    is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
+    worker(w), center(&w->center)
 {
   read_handler = new C_handle_read(this);
   write_handler = new C_handle_write(this);
@@ -2234,6 +2235,7 @@ void AsyncConnection::_stop()
   dispatch_queue->discard_queue(conn_id);
   discard_out_queue();
   async_msgr->unregister_conn(this);
+  worker->release_worker();
 
   state = STATE_CLOSED;
   open_write = false;
@@ -2491,12 +2493,6 @@ void AsyncConnection::mark_down()
   _stop();
 }
 
-void AsyncConnection::release_worker()
-{
-  if (msgr)
-    reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
-}
-
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
   assert(write_lock.is_locked());
index 8bef56c38a05bb4295bf22850a1187df245d2814..3f87d6886c648178dd3d52e708cb9d823851922e 100644 (file)
@@ -37,6 +37,7 @@ using namespace std;
 #include "net_handler.h"
 
 class AsyncMessenger;
+class Worker;
 
 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
 
@@ -182,7 +183,7 @@ class AsyncConnection : public Connection {
   } *delay_state;
 
  public:
-  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p);
+  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
   ~AsyncConnection();
   void maybe_start_delay_thread();
 
@@ -210,8 +211,6 @@ class AsyncConnection : public Connection {
     policy.lossy = true;
   }
   
-  void release_worker();
-
  private:
   enum {
     STATE_NONE,
@@ -364,6 +363,7 @@ class AsyncConnection : public Connection {
   // used only by "read_until"
   uint64_t state_offset;
   NetHandler net;
+  Worker *worker;
   EventCenter *center;
   ceph::shared_ptr<AuthSessionHandler> session_security;
 
index 1dcd00dcd3dfedd888528ada940c220edfc3e16a..7b859a0660e43c59ab5395f05b5279168ae5876e 100644 (file)
@@ -90,6 +90,10 @@ class Worker : public Thread {
   void *entry();
   void stop();
   PerfCounters *get_perf_counter() { return perf_logger; }
+  void release_worker() {
+    int oldref = references.fetch_sub(1);
+    assert(oldref > 0);
+  }
 };
 
 /*******************
@@ -357,7 +361,6 @@ class WorkerPool {
   virtual ~WorkerPool();
   void start();
   Worker *get_worker();
-  void release_worker(EventCenter* c);
   int get_cpuid(int id) {
     if (coreids.empty())
       return -1;
@@ -495,21 +498,6 @@ Worker* WorkerPool::get_worker()
   return current_best;
 }
 
-void WorkerPool::release_worker(EventCenter* c)
-{
-  ldout(cct, 10) << __func__ << dendl;
-  simple_spin_lock(&pool_spin);
-  for (auto p = workers.begin(); p != workers.end(); ++p) {
-    if (&((*p)->center) == c) {
-      ldout(cct, 10) << __func__ << " found worker, releasing" << dendl;
-      int oldref = (*p)->references.fetch_sub(1);
-      assert(oldref > 0);
-      break;
-    }
-  }
-  simple_spin_unlock(&pool_spin);
-}
-
 void WorkerPool::barrier()
 {
   ldout(cct, 10) << __func__ << " started." << dendl;
@@ -543,8 +531,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
   ceph_spin_init(&global_seq_lock);
   cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
   local_worker = pool->get_worker();
-  local_connection = new AsyncConnection(
-      cct, this, &dispatch_queue, &local_worker->center, local_worker->get_perf_counter());
+  local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
   local_features = features;
   init_local_connection();
   reap_handler = new C_handle_reap(this);
@@ -678,7 +665,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd)
 {
   lock.Lock();
   Worker *w = pool->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
   conn->accept(sd);
   accepting_conns.insert(conn);
   lock.Unlock();
@@ -695,7 +682,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int
 
   // create connection
   Worker *w = pool->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
   conn->connect(addr, type);
   assert(!conns.count(addr));
   conns[addr] = conn;
@@ -858,10 +845,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
 Connection *AsyncMessenger::create_anon_connection() {
   Mutex::Locker l(lock);
   Worker *w = pool->get_worker();
-  return new AsyncConnection(cct,
-                            this,
-                            &dispatch_queue,
-                            &w->center, w->get_perf_counter());
+  return new AsyncConnection(cct, this, &dispatch_queue, w);
 }
 
 int AsyncMessenger::get_proto_version(int peer_type, bool connect)
@@ -893,7 +877,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect)
 
 void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) {
   Mutex::Locker l(deleted_lock);
-  conn->release_worker();
   deleted_conns.insert(conn);
 
   if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
index 61f8f3b76d1285c59186a6a117bc673b971e1dce..8e65c14ff55e623b72f468d621e72c31310d97d1 100644 (file)
@@ -454,8 +454,6 @@ public:
    * See "deleted_conns"
    */
   int reap_dead();
-  
-  void release_worker(EventCenter* c);
 
   /**
    * @} // AsyncMessenger Internals