From: Haomai Wang Date: Fri, 27 May 2016 03:31:38 +0000 (+0800) Subject: AsyncMessenger: remove extra release_worker path X-Git-Tag: ses5-milestone5~574^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8907f52db228d0e2825d43fa30c63ede73fb36bc;p=ceph.git AsyncMessenger: remove extra release_worker path Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index f002be603a1..35809c8c116 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -119,9 +119,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) } AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, - EventCenter *c, PerfCounters *p) + Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), - logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), + logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), @@ -130,7 +130,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + worker(w), center(&w->center) { read_handler = new C_handle_read(this); write_handler = new C_handle_write(this); @@ -2234,6 +2235,7 @@ void AsyncConnection::_stop() dispatch_queue->discard_queue(conn_id); discard_out_queue(); async_msgr->unregister_conn(this); + worker->release_worker(); state = STATE_CLOSED; open_write = false; @@ -2491,12 +2493,6 @@ void AsyncConnection::mark_down() _stop(); } -void AsyncConnection::release_worker() -{ - if (msgr) - reinterpret_cast(msgr)->release_worker(center); -} - void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 8bef56c38a0..3f87d6886c6 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -37,6 +37,7 @@ using namespace std; #include "net_handler.h" class AsyncMessenger; +class Worker; static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); @@ -182,7 +183,7 @@ class AsyncConnection : public Connection { } *delay_state; public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, EventCenter *c, PerfCounters *p); + AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); ~AsyncConnection(); void maybe_start_delay_thread(); @@ -210,8 +211,6 @@ class AsyncConnection : public Connection { policy.lossy = true; } - void release_worker(); - private: enum { STATE_NONE, @@ -364,6 +363,7 @@ class AsyncConnection : public Connection { // used only by "read_until" uint64_t state_offset; NetHandler net; + Worker *worker; EventCenter *center; ceph::shared_ptr session_security; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 1dcd00dcd3d..7b859a0660e 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -90,6 +90,10 @@ class Worker : public Thread { void *entry(); void stop(); PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } }; /******************* @@ -357,7 +361,6 @@ class WorkerPool { virtual ~WorkerPool(); void start(); Worker *get_worker(); - void release_worker(EventCenter* c); int get_cpuid(int id) { if (coreids.empty()) return -1; @@ -495,21 +498,6 @@ Worker* WorkerPool::get_worker() return current_best; } -void WorkerPool::release_worker(EventCenter* c) -{ - ldout(cct, 10) << __func__ << dendl; - simple_spin_lock(&pool_spin); - for (auto p = workers.begin(); p != workers.end(); ++p) { - if (&((*p)->center) == c) { - ldout(cct, 10) << __func__ << " found worker, releasing" << dendl; - int oldref = (*p)->references.fetch_sub(1); - assert(oldref > 0); - break; - } - } - simple_spin_unlock(&pool_spin); -} - void WorkerPool::barrier() { ldout(cct, 10) << __func__ << " started." << dendl; @@ -543,8 +531,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, ceph_spin_init(&global_seq_lock); cct->lookup_or_create_singleton_object(pool, WorkerPool::name); local_worker = pool->get_worker(); - local_connection = new AsyncConnection( - cct, this, &dispatch_queue, &local_worker->center, local_worker->get_perf_counter()); + local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); @@ -678,7 +665,7 @@ AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->accept(sd); accepting_conns.insert(conn); lock.Unlock(); @@ -695,7 +682,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int // create connection Worker *w = pool->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter()); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); conns[addr] = conn; @@ -858,10 +845,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr) Connection *AsyncMessenger::create_anon_connection() { Mutex::Locker l(lock); Worker *w = pool->get_worker(); - return new AsyncConnection(cct, - this, - &dispatch_queue, - &w->center, w->get_perf_counter()); + return new AsyncConnection(cct, this, &dispatch_queue, w); } int AsyncMessenger::get_proto_version(int peer_type, bool connect) @@ -893,7 +877,6 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { Mutex::Locker l(deleted_lock); - conn->release_worker(); deleted_conns.insert(conn); if (deleted_conns.size() >= ReapDeadConnectionThreshold) { diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 61f8f3b76d1..8e65c14ff55 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -454,8 +454,6 @@ public: * See "deleted_conns" */ int reap_dead(); - - void release_worker(EventCenter* c); /** * @} // AsyncMessenger Internals