existing->unregister_pipe();
replaced = true;
- if (!existing->policy.lossy) {
+ if (existing->policy.lossy) {
+ // disconnect from the Connection
+ assert(existing->connection_state);
+ if (existing->connection_state->clear_pipe(existing))
+ msgr->dispatch_queue.queue_reset(existing->connection_state.get());
+ } else {
// queue a reset on the old connection
msgr->dispatch_queue.queue_reset(connection_state.get());
connection_state = existing->connection_state;
// make existing Connection reference us
- existing->connection_state->reset_pipe(this);
+ connection_state->reset_pipe(this);
// flush/queue any existing delayed messages
if (existing->delay_thread)
p->pipe_lock.Lock();
p->discard_out_queue();
if (p->connection_state) {
- // mark_down, mark_down_all, or fault() should have done this, but make sure!
+ // mark_down, mark_down_all, or fault() should have done this,
+ // or accept() may have switch the Connection to a different
+ // Pipe... but make sure!
bool cleared = p->connection_state->clear_pipe(p);
assert(!cleared);
}