state(STATE_NONE), state_after_send(0), sd(-1),
lock("AsyncConnection::lock"), open_write(false), keepalive(false),
stop_lock("AsyncConnection::stop_lock"),
- got_bad_auth(false), authorizer(NULL),
+ got_bad_auth(false), authorizer(NULL), replacing(false),
state_buffer(4096), state_offset(0), net(cct), center(c)
{
read_handler.reset(new C_handle_read(this));
<< " > " << existing->connect_seq << dendl;
goto replace;
} // existing
- else if (policy.resetcheck && connect.connect_seq > 0) {
+ else if (!replacing && policy.resetcheck && connect.connect_seq > 0) {
// we reset, and they are opening a new session
ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
<< connect.connect_seq << "), sending RESETSESSION" << dendl;
if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0)
goto fail;
- uint64_t s = existing->sd;
- existing->sd = sd;
- sd = s;
+ // Now existing connection will be alive and the current connection will
+ // exchange socket with existing connection because we want to maintain
+ // original "connection_state"
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+
+ swap(existing->sd, sd);
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+ existing->open_write = false;
+ existing->discard_out_queue();
+ existing->replacing = true;
_stop();
existing->lock.Unlock();
return 0;
existing->lock.Unlock();
open:
+ replacing = false;
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
(*r)->put();
}
out_q.clear();
+ outcoming_bl.clear();
}
int AsyncConnection::randomize_out_seq()
{
ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
discard_out_queue();
- outcoming_bl.clear();
center->dispatch_event_external(remote_reset_handler);
void AsyncConnection::_stop()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ if (sd > 0)
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
shutdown_socket();
discard_out_queue();
- outcoming_bl.clear();
open_write = false;
state = STATE_CLOSED;
- ::close(sd);
+ if (sd > 0)
+ ::close(sd);
sd = -1;
- async_msgr->unregister_conn(peer_addr);
+ async_msgr->unregister_conn(this);
// Here we need to dispatch "signal" event, because we want to ensure signal
// it after all events called by this "_stop" has be done.
center->dispatch_event_external(signal_handler);
ostream& _conn_prefix(std::ostream *_dout);
bool is_connected() {
- // FIXME?
- return state != STATE_CLOSED;
+ Mutex::Locker l(lock);
+ return state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE;
}
// Only call when AsyncConnection first construct
void mark_down() {
Mutex::Locker l(stop_lock);
if (center->get_owner() == pthread_self()) {
- _stop();
+ stop();
} else {
center->dispatch_event_external(stop_handler);
stop_cond.Wait(stop_lock);
// Accepting state
entity_addr_t socket_addr;
CryptoKey session_key;
+ bool replacing; // when replacing process happened, we will reply connect
+ // side with RETRY tag and accept side will clear replaced
+ // connection. So when connect side reissue connect_msg,
+ // there won't exists conflicting connection so we use
+ // "replacing" to skip RESETSESSION to avoid detect wrong
+ // presentation
// used only for local state, it will be overwrite when state transition
bufferptr state_buffer;
processor(this, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), did_bind(false),
- global_seq(0),
+ global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
while (!conns.empty()) {
AsyncConnectionRef p = conns.begin()->second;
- _stop_conn(p);
+ p->mark_down();
}
}
lock.Unlock();
AsyncConnectionRef p = _lookup_conn(addr);
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
- _stop_conn(p);
+ p->mark_down();
p->get();
ms_deliver_handle_reset(p.get());
} else {
// FIXME clear up
set<AsyncConnectionRef> accepting_conns;
+ /**
+ * list of connection are closed which need to be clean up
+ *
+ * Because AsyncMessenger and AsyncConnection follow a lock rule that
+ * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
+ * but can't reversed. This rule is aimed to avoid dead lock.
+ * So if AsyncConnection want to unregister itself from AsyncMessenger,
+ * we pick up this idea that just queue itself to this set and do lazy
+ * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
+ * AsyncConnection in this set.
+ */
+ Mutex deleted_lock;
+ set<AsyncConnectionRef> deleted_conns;
+
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
if (p == conns.end())
return NULL;
- assert(p->second->is_connected());
- return p->second;
- }
-
- void _stop_conn(AsyncConnectionRef c) {
- assert(lock.is_locked());
- if (c) {
- c->mark_down();
- conns.erase(c->peer_addr);
+ // lazy delete, see "deleted_conns"
+ Mutex::Locker l(deleted_lock);
+ if (deleted_conns.count(p->second)) {
+ deleted_conns.erase(p->second);
+ conns.erase(p);
+ return NULL;
}
+
+ return p->second;
}
void _init_local_connection() {
ms_deliver_handle_fast_connect(local_connection.get());
}
-
public:
/// con used for sending messages to ourselves
/**
* Unregister connection from `conns`
+ *
+ * See "deleted_conns"
*/
- void unregister_conn(const entity_addr_t &addr) {
- Mutex::Locker l(lock);
- conns.erase(addr);
+ void unregister_conn(AsyncConnectionRef conn) {
+ Mutex::Locker l(deleted_lock);
+ deleted_conns.insert(conn);
}
/**
* @} // AsyncMessenger Internals