// this should roughly mirror pseudocode at
// http://ceph.newdream.net/wiki/Messaging_protocol
-
+ int reply_tag = 0;
+ bool replace = false;
while (1) {
rc = tcp_read(sd, (char*)&connect, sizeof(connect));
if (rc < 0) {
}
replace:
+ replace = true;
+ if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
+ reply_tag = CEPH_MSGR_TAG_SEQ;
+ }
dout(10) << "accept replacing " << existing << dendl;
existing->stop();
existing->unregister_pipe();
- // steal queue and out_seq
- existing->requeue_sent();
- out_seq = existing->out_seq;
- in_seq = existing->in_seq;
- dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl;
- for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
- p != existing->out_q.end();
- p++)
- out_q[p->first].splice(out_q[p->first].begin(), p->second);
-
//set ourself to take over other Connection, for older messages
existing->connection_state->clear_pipe();
existing->connection_state->pipe = get();
existing->connection_state->put();
existing->connection_state = NULL;
- existing->pipe_lock.Unlock();
open:
// open
dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
// send READY reply
- reply.tag = CEPH_MSGR_TAG_READY;
+ reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
reply.features = policy.features_supported;
reply.global_seq = messenger->get_global_seq();
reply.connect_seq = connect_seq;
goto fail_unlocked;
}
+ if (replace) {
+ uint64_t newly_acked_seq = 0;
+ if (reply_tag == CEPH_MSGR_TAG_SEQ) {
+ if(tcp_write(sd, (char*)&existing->in_seq, sizeof(existing->in_seq)) < 0) {
+ dout(2) << "accept write error on in_seq" << dendl;
+ goto fail_unlocked;
+ }
+ if(tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
+ dout(2) << "accept read error on newly_acked_seq" << dendl;
+ goto fail_unlocked;
+ }
+ }
+ // steal queue and out_seq
+ existing->requeue_sent(newly_acked_seq);
+ out_seq = existing->out_seq;
+ in_seq = existing->in_seq;
+ in_seq_acked = in_seq;
+ dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl;
+ for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
+ p != existing->out_q.end();
+ p++)
+ out_q[p->first].splice(out_q[p->first].begin(), p->second);
+
+ existing->pipe_lock.Unlock();
+ }
+
pipe_lock.Lock();
if (state != STATE_CLOSED) {
dout(10) << "accept starting writer, " << "state=" << state << dendl;
goto stop_locked;
}
- if (reply.tag == CEPH_MSGR_TAG_READY) {
+ if (reply.tag == CEPH_MSGR_TAG_READY ||
+ reply.tag == CEPH_MSGR_TAG_SEQ) {
uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
if (feat_missing) {
dout(1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
goto fail_locked;
}
+ if (reply.tag == CEPH_MSGR_TAG_SEQ) {
+ dout(10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
+ uint64_t newly_acked_seq = 0;
+ if (tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
+ dout(2) << "connect read error on newly_acked_seq" << dendl;
+ goto fail_locked;
+ }
+ handle_ack(newly_acked_seq);
+ if (tcp_write(sd, (char*)&in_seq, sizeof(in_seq)) < 0) {
+ dout(2) << "connect write error on in_seq" << dendl;
+ goto fail_locked;
+ }
+ }
+
// hooray!
peer_global_seq = reply.global_seq;
policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
} else if (state != STATE_CLOSED) {
- dout(15) << "reader got ack seq " << seq << dendl;
- // trim sent list
- while (!sent.empty() &&
- sent.front()->get_seq() <= seq) {
- Message *m = sent.front();
- sent.pop_front();
- dout(10) << "reader got ack seq "
- << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
- m->put();
- }
+ handle_ack(seq);
}
continue;
}