}
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
- EventCenter *c, PerfCounters *p)
+ Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
- logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
+ logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
last_active(ceph::coarse_mono_clock::now()),
inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
got_bad_auth(false), authorizer(NULL), replacing(false),
- is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
+ is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct),
+ worker(w), center(&w->center)
{
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
dispatch_queue->discard_queue(conn_id);
discard_out_queue();
async_msgr->unregister_conn(this);
+ worker->release_worker();
state = STATE_CLOSED;
open_write = false;
_stop();
}
-void AsyncConnection::release_worker()
-{
- if (msgr)
- reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
-}
-
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
assert(write_lock.is_locked());
void *entry();
void stop();
PerfCounters *get_perf_counter() { return perf_logger; }
+ void release_worker() {
+ int oldref = references.fetch_sub(1);
+ assert(oldref > 0);
+ }
};
/*******************
virtual ~WorkerPool();
void start();
Worker *get_worker();
- void release_worker(EventCenter* c);
int get_cpuid(int id) {
if (coreids.empty())
return -1;
return current_best;
}
-void WorkerPool::release_worker(EventCenter* c)
-{
- ldout(cct, 10) << __func__ << dendl;
- simple_spin_lock(&pool_spin);
- for (auto p = workers.begin(); p != workers.end(); ++p) {
- if (&((*p)->center) == c) {
- ldout(cct, 10) << __func__ << " found worker, releasing" << dendl;
- int oldref = (*p)->references.fetch_sub(1);
- assert(oldref > 0);
- break;
- }
- }
- simple_spin_unlock(&pool_spin);
-}
-
void WorkerPool::barrier()
{
ldout(cct, 10) << __func__ << " started." << dendl;
ceph_spin_init(&global_seq_lock);
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
local_worker = pool->get_worker();
- local_connection = new AsyncConnection(
- cct, this, &dispatch_queue, &local_worker->center, local_worker->get_perf_counter());
+ local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
local_features = features;
init_local_connection();
reap_handler = new C_handle_reap(this);
{
lock.Lock();
Worker *w = pool->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
conn->accept(sd);
accepting_conns.insert(conn);
lock.Unlock();
// create connection
Worker *w = pool->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, &w->center, w->get_perf_counter());
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
conn->connect(addr, type);
assert(!conns.count(addr));
conns[addr] = conn;
Connection *AsyncMessenger::create_anon_connection() {
Mutex::Locker l(lock);
Worker *w = pool->get_worker();
- return new AsyncConnection(cct,
- this,
- &dispatch_queue,
- &w->center, w->get_perf_counter());
+ return new AsyncConnection(cct, this, &dispatch_queue, w);
}
int AsyncMessenger::get_proto_version(int peer_type, bool connect)
void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) {
Mutex::Locker l(deleted_lock);
- conn->release_worker();
deleted_conns.insert(conn);
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {