void AsyncConnection::discard_out_queue()
{
ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
+ assert(write_lock.is_locked());
- Mutex::Locker l(write_lock);
for (list<pair<bufferlist, Message*> >::iterator p = sent.begin(); p != sent.end(); ++p) {
ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl;
p->second->put();
void AsyncConnection::was_session_reset()
{
ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
+ Mutex::Locker l(write_lock);
discard_out_queue();
center->dispatch_event_external(remote_reset_handler);
connect_seq = 0;
in_seq_acked = 0;
once_ready = false;
+ can_write = 0;
}
void AsyncConnection::_stop()
return ;
ldout(async_msgr->cct, 10) << __func__ << dendl;
+ Mutex::Locker l(write_lock);
if (sd >= 0)
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
async_msgr->unregister_conn(this);
state = STATE_CLOSED;
- Mutex::Locker l(write_lock);
open_write = false;
can_write = 2;
state_offset = 0;