state(STATE_NONE), state_after_send(0), sd(-1),
lock("AsyncConnection::lock"), open_write(false), keepalive(false),
stop_lock("AsyncConnection::stop_lock"),
- got_bad_auth(false), authorizer(NULL), replacing(false),
+ got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0),
state_buffer(4096), state_offset(0), net(cct), center(c)
{
read_handler.reset(new C_handle_read(this));
}
ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
+ // TODO: it's tricky that exit loop if exist AsyncMessenger waiting for
+ // mark_down. Otherwise, it will be deadlock while
+ // AsyncMessenger::mark_down_all already hold lock.
+ if (stopping.read())
+ break;
async_msgr->learned_addr(peer_addr_for_me);
::encode(async_msgr->get_myaddr(), myaddrbl);
r = _try_send(myaddrbl);
if (center->get_owner() == pthread_self()) {
stop();
} else {
+ stopping.set(1);
center->dispatch_event_external(stop_handler);
stop_cond.Wait(stop_lock);
}
// there won't exists conflicting connection so we use
// "replacing" to skip RESETSESSION to avoid detect wrong
// presentation
+ atomic_t stopping;
// used only for local state, it will be overwrite when state transition
bufferptr state_buffer;
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, _nonce),
lock("AsyncMessenger::lock"),
- nonce(_nonce), did_bind(false),
+ nonce(_nonce), need_addr(true), did_bind(false),
global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
ldout(cct, 10) << __func__ << ": closing connections" << dendl;
while (!conns.empty()) {
- AsyncConnectionRef p = conns.begin()->second;
+ ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+ AsyncConnectionRef p = it->second;
+ ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
+ conns.erase(it);
p->mark_down();
+ ms_deliver_handle_reset(p.get());
}
}
lock.Unlock();
// this always goes from true -> false under the protection of the
// mutex. if it is already false, we need not retake the mutex at
// all.
+ if (!need_addr)
+ return ;
lock.Lock();
- entity_addr_t t = peer_addr_for_me;
- t.set_port(my_inst.addr.get_port());
- my_inst.addr.addr = t.addr;
- ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
- _init_local_connection();
+ if (need_addr) {
+ need_addr = false;
+ entity_addr_t t = peer_addr_for_me;
+ t.set_port(my_inst.addr.get_port());
+ my_inst.addr.addr = t.addr;
+ ldout(cct, 1) << __func__ << " learned my addr " << my_inst.addr << dendl;
+ _init_local_connection();
+ }
lock.Unlock();
}