AsyncConnection::~AsyncConnection()
{
+ assert(out_q.empty());
+ assert(sent.empty());
assert(!authorizer);
if (recv_buf)
delete recv_buf;
goto fail;
}
+ if (existing == this)
+ existing = NULL;
if (existing) {
if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
// Here we use "_stop" instead of "mark_down" because "mark_down" is a async
// operation, but now we need ensure all variables in `existing` is cleaned up
// and we will reuse it next.
- existing->_stop();
+ existing->_stop(true);
if (existing->policy.lossy) {
// disconnect from the Connection
center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
+ existing->discard_out_queue();
} else {
// queue a reset on the new connection, which we're dumping for the old
center->dispatch_event_external(reset_handler);
swap(existing->sd, sd);
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
existing->open_write = false;
- existing->discard_out_queue();
existing->replacing = true;
_stop();
existing->lock.Unlock();
if (policy.lossy && state != STATE_CONNECTING) {
ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
center->dispatch_event_external(reset_handler);
+ discard_out_queue();
_stop();
return ;
}
}
// *note: `async` is true only happen when replacing connection process
-void AsyncConnection::_stop()
+void AsyncConnection::_stop(bool replacing)
{
assert(lock.is_locked());
ldout(async_msgr->cct, 10) << __func__ << dendl;
if (sd > 0)
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- async_msgr->unregister_conn(this);
+ if (!replacing) {
+ discard_out_queue();
+ async_msgr->unregister_conn(this);
+ }
if (async_msgr->cct->_conf->ms_inject_internal_delays) {
ldout(msgr->cct, 10) << __func__ << " sleep for "
}
shutdown_socket();
- discard_out_queue();
open_write = false;
state_offset = 0;
state = STATE_CLOSED;
int read_until(uint64_t needed, char *p);
int _process_connection();
void _connect();
- void _stop();
+ void _stop(bool discard=true);
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
{
ldout(cct, 20) << __func__ << " add event fd=" << fd << " cur_mask=" << cur_mask
- << " add_mask=" << add_mask << dendl;
+ << " add_mask=" << add_mask << " to " << epfd << dendl;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(epfd, op, fd, &ee) == -1) {
- lderr(cct) << __func__ << " unable to add event: "
- << cpp_strerror(errno) << dendl;
+ lderr(cct) << __func__ << " epoll_ctl: add fd=" << fd << " failed. "
+ << cpp_strerror(errno) << dendl;
return -errno;
}
void EpollDriver::del_event(int fd, int cur_mask, int delmask)
{
ldout(cct, 20) << __func__ << " del event fd=" << fd << " cur_mask=" << cur_mask
- << " delmask=" << delmask << dendl;
+ << " delmask=" << delmask << " to " << epfd << dendl;
struct epoll_event ee;
int mask = cur_mask & (~delmask);