void SimpleMessenger::submit_message(Message *m, Pipe *pipe)
{
+ assert(pipe->msgr == this);
lock.Lock();
if (pipe == dispatch_queue.local_pipe) {
ldout(cct,20) << "submit_message " << *m << " local" << dendl;
SimpleMessenger::Pipe *pipe = (SimpleMessenger::Pipe *)con->get_pipe();
if (pipe) {
ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
+ assert(pipe->msgr == this);
pipe->pipe_lock.Lock();
pipe->_send_keepalive();
pipe->pipe_lock.Unlock();
Pipe *p = (Pipe *)con->get_pipe();
if (p) {
ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
+ assert(p->msgr == this);
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
lock.Lock();
Pipe *p = (Pipe *)con->get_pipe();
if (p) {
+ assert(p->msgr == this);
p->pipe_lock.Lock();
p->unregister_pipe();
if (p->out_q.empty()) {
Pipe *p = (Pipe *)con->get_pipe();
if (p) {
ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
+ assert(p->msgr == this);
p->pipe_lock.Lock();
p->policy.lossy = true;
p->disposable = true;