From c0c51e5ce820c8cf92a19a55c290ced6f842eb69 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 12 Mar 2008 13:41:14 -0700 Subject: [PATCH] msgr: rewrote connect and accept based on new protocol spec --- src/include/ceph_fs.h | 12 +- src/msg/SimpleMessenger.cc | 361 +++++++++++++++++++++---------------- src/msg/SimpleMessenger.h | 13 +- 3 files changed, 219 insertions(+), 167 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index e06473860e59d..395d39243b482 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -206,11 +206,13 @@ struct ceph_entity_name { #define CEPH_ENTITY_TYPE_CLIENT 4 #define CEPH_ENTITY_TYPE_ADMIN 5 -#define CEPH_MSGR_TAG_READY 1 /* server -> client + cseq: ready for messages */ -#define CEPH_MSGR_TAG_REJECT 2 /* server -> client + cseq: decline socket */ -#define CEPH_MSGR_TAG_MSG 3 /* message */ -#define CEPH_MSGR_TAG_ACK 4 /* message ack */ -#define CEPH_MSGR_TAG_CLOSE 5 /* closing pipe */ +#define CEPH_MSGR_TAG_READY 1 /* server -> client: ready for messages */ +#define CEPH_MSGR_TAG_RESETSESSION 2 /* server -> client: reset, try again */ +#define CEPH_MSGR_TAG_WAIT 3 /* server -> client: wait for racing incoming connection */ +#define CEPH_MSGR_TAG_RETRY 4 /* server -> client + cseq: try again with higher cseq */ +#define CEPH_MSGR_TAG_CLOSE 5 /* closing pipe */ +#define CEPH_MSGR_TAG_MSG 10 /* message */ +#define CEPH_MSGR_TAG_ACK 11 /* message ack */ /* diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index dd154f99c5f7f..bc179786c0a4e 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -752,23 +752,6 @@ void Rank::mark_down(entity_addr_t addr) #define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")." #define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")." -/* - * we have to be careful about connection races: - * A initiates connection - * B initiates connection - * B accepts A's connection - * A rejects B's connection (or vice-versa) - * - * this is controlled by whether accept uses the new incoming socket - * as the new pipe. two cases: - * old new(incoming) - * connecting connecting -> use socket initiated by lower address - * open connecting - * -> use new socket _only_ if connect_seq matches. that is, the - * peer reconnected subsequent to the current open socket. if - * connect_seq _doesn't_ match, it means that it is an 'old' attempt. - */ - int Rank::Pipe::accept() { dout(10) << "accept" << dendl; @@ -804,93 +787,121 @@ int Rank::Pipe::accept() } dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl; } - - __u32 cseq; - rc = tcp_read(sd, (char*)&cseq, sizeof(cseq)); - if (rc < 0) { - dout(10) << "accept couldn't read connect seq" << dendl; - state = STATE_CLOSED; - return -1; - } - - dout(20) << "accept got connect_seq " << cseq << dendl; - - __u32 myseq = connect_seq = 1; + + __u32 peer_cseq; + connect_seq = 0; + + while (1) { + rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq)); + if (rc < 0) { + dout(10) << "accept couldn't read connect peer_seq" << dendl; + goto fail; + } + dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl; - // register pipe. - rank.lock.Lock(); - { - if (rank.rank_pipe.count(peer_addr) == 0) { - dout(10) << "accept new peer " << peer_addr << dendl; - register_pipe(); - } else { - // hmm! - Pipe *other = rank.rank_pipe[peer_addr]; - other->lock.Lock(); - - dout(10) << "accept got connect_seq " << cseq - << ", existing pipe connect_seq " << other->connect_seq - << " state " << other->state - << dendl; - - // if open race, low addr's pipe "wins". - // otherwise, look at connect_seq - if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) || - (other->state == STATE_OPEN && cseq >= other->connect_seq)) { - dout(10) << "accept already had pipe " << other - << ", but switching to this new one" << dendl; - // switch to this new Pipe - other->state = STATE_CLOSED; - assert(q.empty()); - other->cond.Signal(); - other->unregister_pipe(); - register_pipe(); + rank.lock.Lock(); + + // existing? + if (rank.rank_pipe.count(peer_addr)) { + Pipe *existing = rank.rank_pipe[peer_addr]; + existing->lock.Lock(); + + if (peer_cseq < existing->connect_seq) { + // old attempt, or we sent READY but they didn't get it. + dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " > " << peer_cseq << ", RETRY" << dendl; + existing->lock.Unlock(); + rank.lock.Unlock(); + char tag = CEPH_MSGR_TAG_RETRY; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + continue; + } + + if ((peer_cseq == existing->connect_seq && peer_addr < rank.rank_addr) || + (peer_cseq > existing->connect_seq)) { + // connection race, incoming wins; or + // reconnect + dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " <= " << peer_cseq << ", replacing" << dendl; + assert(existing->state == STATE_CONNECTING || + existing->state == STATE_WAIT); + existing->state = STATE_CLOSED; + existing->cond.Signal(); + existing->unregister_pipe(); // steal queue and out_seq - out_seq = other->out_seq; - if (!other->sent.empty()) { - out_seq = other->sent.front()->get_seq()-1; - q.splice(q.begin(), other->sent); + out_seq = existing->out_seq; + if (!existing->sent.empty()) { + out_seq = existing->sent.front()->get_seq()-1; + q.splice(q.begin(), existing->sent); } - q.splice(q.end(), other->q); - } - else { - dout(10) << "accept already had pipe " << other - << ", closing this one" << dendl; - myseq = other->connect_seq; - state = STATE_CLOSED; + q.splice(q.end(), existing->q); + + existing->lock.Unlock(); + break; } - other->lock.Unlock(); - } + + if (peer_cseq == existing->connect_seq) { + // connection race, our outgoing wins + dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " == " << peer_cseq << ", sending WAIT" << dendl; + assert(peer_addr > rank.rank_addr); + assert(existing->state == STATE_CONNECTING); + existing->lock.Unlock(); + rank.lock.Unlock(); + + char tag = CEPH_MSGR_TAG_WAIT; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + continue; + } + + assert(0); + } + + if (peer_cseq > 0) { + // we reset, and are opening a new session + dout(10) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl; + rank.lock.Unlock(); + char tag = CEPH_MSGR_TAG_RESETSESSION; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + continue; + } else { + // new session + dout(10) << "accept new session" << dendl; + break; + } + assert(0); } + + // okay! + register_pipe(); rank.lock.Unlock(); - char tag; - if (state == STATE_CLOSED) { - dout(10) << "accept closed, sending REJECT tag" << dendl; - tag = CEPH_MSGR_TAG_REJECT; - } else { - dout(10) << "accept sending READY tag" << dendl; - tag = CEPH_MSGR_TAG_READY; - state = STATE_OPEN; - kick_reader_on_join = true; - } + connect_seq = peer_cseq + 1; + dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; - if (tcp_write(sd, &tag, 1) < 0 || - tcp_write(sd, (char*)&myseq, sizeof(myseq)) < 0) { - dout(2) << "accept couldn't send initial tag+seq: " - << strerror(errno) << dendl; - fault(); + // send READY + { + char tag = CEPH_MSGR_TAG_READY; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; } if (state != STATE_CLOSED) { - dout(10) << "accept starting writer, " - << "state=" << state << dendl; + dout(10) << "accept starting writer, " << "state=" << state << dendl; start_writer(); } - dout(20) << "accept done" << dendl; return 0; // success. + + + fail: + state = STATE_CLOSED; + fault(); + return -1; } int Rank::Pipe::connect() @@ -903,10 +914,9 @@ int Rank::Pipe::connect() sd = -1; } __u32 cseq = connect_seq; - __u32 rseq; lock.Unlock(); - + int newsd; char tag = -1; int rc; @@ -915,7 +925,7 @@ int Rank::Pipe::connect() struct msghdr msg; struct iovec msgvec[2]; int msglen; - + // create socket? newsd = ::socket(AF_INET, SOCK_STREAM, 0); if (newsd < 0) { @@ -928,7 +938,6 @@ int Rank::Pipe::connect() myAddr.sin_family = AF_INET; myAddr.sin_addr.s_addr = htonl(INADDR_ANY); myAddr.sin_port = htons( 0 ); - rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr)); assert(rc>=0); @@ -962,7 +971,7 @@ int Rank::Pipe::connect() goto fail; } - // identify myself, and send open seq + // identify myself, and send initial cseq memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = (char*)&rank.rank_addr; msgvec[0].iov_len = sizeof(rank.rank_addr); @@ -972,47 +981,26 @@ int Rank::Pipe::connect() msg.msg_iovlen = 2; msglen = msgvec[0].iov_len + msgvec[1].iov_len; - if (do_sendmsg(newsd, &msg, msglen)) { - dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl; - goto fail; - } - - dout(20) << "connect wrote self, seq, waiting for tag" << dendl; - - // wait for tag - if (tcp_read(newsd, &tag, 1) < 0 || - tcp_read(newsd, (char*)&rseq, sizeof(rseq)) < 0) { - dout(2) << "connect read tag, seq, " << strerror(errno) << dendl; - goto fail; - } - - dout(20) << "connect got initial tag " << (int)tag << " + seq " << rseq << dendl; + while (1) { + if (do_sendmsg(newsd, &msg, msglen)) { + dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl; + goto fail; + } + dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl; + if (tcp_read(newsd, &tag, 1) < 0) { + dout(2) << "connect read tag, seq, " << strerror(errno) << dendl; + goto fail; + } + dout(20) << "connect got tag " << (int)tag << dendl; - lock.Lock(); + if (tag == CEPH_MSGR_TAG_RESETSESSION) { + lock.Lock(); + if (state != STATE_CONNECTING) { + dout(0) << "connect got RESETSESSION but no longer connecting" << dendl; + goto stop_locked; + } - // FINISH - if (state != STATE_CONNECTING) { - dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl; - goto fail_locked; // hmm! - } - if (tag == CEPH_MSGR_TAG_REJECT) { - if (connect_seq != rseq) { - dout(0) << "connect got REJECT, old connect_seq was " << connect_seq - << ", taking new " << rseq << dendl; - connect_seq = rseq; - } else { - dout(10) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl; - } - goto fail_locked; - } - assert(tag == CEPH_MSGR_TAG_READY); - state = STATE_OPEN; - this->sd = newsd; - connect_seq++; - if (rseq != connect_seq) { - dout(0) << "connect REMOTE RESET: my seq = " << connect_seq << ", remote seq = " << rseq << dendl; - if (rseq < connect_seq) { - connect_seq = rseq; + dout(0) << "connect got RESETSESSION" << dendl; report_failures(); for (unsigned i=0; iget_dispatcher()) @@ -1022,25 +1010,83 @@ int Rank::Pipe::connect() for (list::iterator p = q.begin(); p != q.end(); p++) (*p)->set_seq(++out_seq); in_seq = 0; - } else { - dout(0) << "WTF" << dendl; - assert(0); } - } - first_fault = last_attempt = utime_t(); - dout(20) << "connect success " << connect_seq << dendl; + if (tag == CEPH_MSGR_TAG_RETRY) { + int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq)); + if (rc < 0) { + dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl; + goto fail; + } + lock.Lock(); + if (state != STATE_CONNECTING) { + dout(0) << "connect got RETRY, but connection race or something, failing" << dendl; + goto stop_locked; + } + assert(cseq > connect_seq); + dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl; + connect_seq = cseq; + } - if (!reader_running) { - dout(20) << "connect starting reader" << dendl; - start_reader(); + if (tag == CEPH_MSGR_TAG_RESETSESSION || + tag == CEPH_MSGR_TAG_RETRY) { + // retry + lock.Unlock(); + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&cseq; + msgvec[0].iov_len = sizeof(cseq); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + continue; + } + + if (tag == CEPH_MSGR_TAG_WAIT) { + lock.Lock(); + if (state == STATE_CONNECTING) { + dout(3) << "connect got WAIT (connection race), will wait" << dendl; + state = STATE_WAIT; + } else { + dout(3) << "connect got WAIT (connection race), and lo, the wait is already over" << dendl; + } + goto stop_locked; + } + + if (tag == CEPH_MSGR_TAG_READY) { + lock.Lock(); + if (state != STATE_CONNECTING) { + dout(3) << "connect got READY but no longer connecting?" << dendl; + goto stop_locked; + } + + // hooray! + state = STATE_OPEN; + sd = newsd; + connect_seq = cseq+1; + first_fault = last_attempt = utime_t(); + dout(20) << "connect success " << connect_seq << dendl; + + if (!reader_running) { + dout(20) << "connect starting reader" << dendl; + start_reader(); + } + return 0; + } + + // protocol error + dout(0) << "connect got bad tag " << (int)tag << dendl; + goto fail; } - return 0; fail: lock.Lock(); - fail_locked: - if (newsd >= 0) ::close(newsd); - fault(tag == CEPH_MSGR_TAG_REJECT); // quiet if reject (not socket error) + if (state == STATE_CONNECTING) + fault(); + else + dout(3) << "connect fault, but state != connecting, stopping" << dendl; + + stop_locked: + if (newsd >= 0) + ::close(newsd); return -1; } @@ -1296,8 +1342,12 @@ void Rank::Pipe::reader() else if (tag == CEPH_MSGR_TAG_CLOSE) { dout(20) << "reader got CLOSE" << dendl; lock.Lock(); - fault(true); // treat as a fault; i.e. reconnect|close - continue; + if (state == STATE_CLOSING) + state = STATE_CLOSED; + else + state = STATE_CLOSING; + cond.Signal(); + break; } else { dout(0) << "reader bad tag " << (int)tag << dendl; @@ -1324,6 +1374,8 @@ void Rank::Pipe::reader() } rank.lock.Unlock(); } + + dout(10) << "reader done" << dendl; } /* @@ -1345,7 +1397,7 @@ void Rank::Pipe::writer() { lock.Lock(); - while (state != STATE_CLOSED) { + while (state != STATE_CLOSED && state != STATE_WAIT) { // standby? if (!q.empty() && state == STATE_STANDBY) state = STATE_CONNECTING; @@ -1359,11 +1411,11 @@ void Rank::Pipe::writer() if (state == STATE_CLOSING) { // write close tag dout(20) << "writer writing CLOSE tag" << dendl; - char c = CEPH_MSGR_TAG_CLOSE; + char tag = CEPH_MSGR_TAG_CLOSE; + state = STATE_CLOSED; lock.Unlock(); - if (sd) ::write(sd, &c, 1); + if (sd) ::write(sd, &tag, 1); lock.Lock(); - state = STATE_CLOSED; continue; } @@ -1442,6 +1494,8 @@ void Rank::Pipe::writer() } rank.lock.Unlock(); } + + dout(10) << "writer done" << dendl; } @@ -1517,10 +1571,11 @@ Message *Rank::Pipe::read_message() } // unmarshall message + dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " + << env.src << dendl; + Message *m = decode_message(env, front, data); - dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " - << m->get_source() << dendl; return m; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 01fb09d577b89..7c866fcbed016 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -87,9 +87,8 @@ private: STATE_OPEN, STATE_STANDBY, STATE_CLOSED, - STATE_CLOSING - //STATE_GOTCLOSE, // got (but haven't sent) a close - //STATE_SENTCLOSE // sent (but haven't got) a close + STATE_CLOSING, + STATE_WAIT // just wait for racing connection }; int sd; @@ -107,7 +106,6 @@ private: utime_t last_attempt; // time of last reconnect attempt bool reader_running; - bool kick_reader_on_join; bool writer_running; list q; @@ -155,7 +153,7 @@ private: Pipe(int st) : sd(-1), state(st), - reader_running(false), kick_reader_on_join(false), writer_running(false), + reader_running(false), writer_running(false), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { } @@ -181,10 +179,7 @@ private: void dirty_close(); void join() { if (writer_thread.is_started()) writer_thread.join(); - if (reader_thread.is_started()) { - //if (kick_reader_on_join) reader_thread.kill(SIGUSR1); - reader_thread.join(); - } + if (reader_thread.is_started()) reader_thread.join(); } void stop(); -- 2.39.5