int Pipe::accept()
{
ldout(msgr->cct,10) << "accept" << dendl;
+ assert(pipe_lock.is_locked());
+ assert(state == STATE_ACCEPTING);
- set_socket_options();
+ pipe_lock.Unlock();
- // my creater gave me sd via accept()
- assert(state == STATE_ACCEPTING);
-
// vars
bufferlist addrs;
entity_addr_t socket_addr;
// used for reading in the remote acked seq on connect
uint64_t newly_acked_seq = 0;
+ set_socket_options();
+
// announce myself.
r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
if (r < 0) {
goto fail_unlocked;
}
-
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
<< dendl;
msgr->lock.Lock(); // FIXME
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
+ if (state != STATE_ACCEPTING) {
+ goto shutting_down;
+ }
// note peer's type, flags
set_peer_type(connect.host_type);
ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
+ pipe_lock.Unlock();
msgr->lock.Lock();
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
-
+ if (state != STATE_ACCEPTING)
+ goto shutting_down;
// existing?
existing = msgr->_lookup_pipe(peer_addr);
if (existing) {
- existing->pipe_lock.Lock();
+ existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
if (connect.global_seq < existing->peer_global_seq) {
ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
assert(0);
retry_session:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq + 1;
existing->pipe_lock.Unlock();
goto reply;
reply:
+ assert(pipe_lock.is_locked());
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
+ pipe_lock.Unlock();
r = tcp_write((char*)&reply, sizeof(reply));
if (r < 0)
goto fail_unlocked;
}
replace:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
reply_tag = CEPH_MSGR_TAG_SEQ;
existing_seq = existing->in_seq;
open:
// open
+ assert(pipe_lock.is_locked());
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
assert(state == STATE_ACCEPTING);
assert(removed == 1);
register_pipe();
msgr->lock.Unlock();
+ pipe_lock.Unlock();
r = tcp_write((char*)&reply, sizeof(reply));
if (r < 0) {
start_writer();
}
ldout(msgr->cct,20) << "accept done" << dendl;
- pipe_lock.Unlock();
maybe_start_delay_thread();
if (queued || replaced)
start_writer();
}
- pipe_lock.Unlock();
return -1;
shutting_down:
+ assert(pipe_lock.is_locked());
msgr->lock.Unlock();
if (msgr->cct->_conf->ms_inject_internal_delays) {
t.sleep();
}
- pipe_lock.Lock();
state = STATE_CLOSED;
state_closed.set(1);
fault();
- pipe_lock.Unlock();
return -1;
}
*/
void Pipe::reader()
{
- if (state == STATE_ACCEPTING)
- accept();
-
pipe_lock.Lock();
+ if (state == STATE_ACCEPTING) {
+ accept();
+ assert(pipe_lock.is_locked());
+ }
+
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {