}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+}
+
+void Pipe::maybe_start_delay_thread()
+{
if (!delay_thread &&
msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
delay_thread = new DelayedDelivery(this);
delay_thread->create();
- } else
- lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type)
- << "; NOT injecting delays because it does not match "
- << msgr->cct->_conf->ms_inject_delay_type << dendl;
+ }
}
void Pipe::start_writer()
}
ldout(msgr->cct,20) << "accept done" << dendl;
pipe_lock.Unlock();
+
+ maybe_start_delay_thread();
+
return 0; // success.
fail_unlocked:
ldout(msgr->cct,20) << "connect starting reader" << dendl;
start_reader();
}
+ maybe_start_delay_thread();
delete authorizer;
return 0;
}