#define CEPH_MSGR_TAG_READY 1 /* server -> client: ready for messages */
#define CEPH_MSGR_TAG_RESETSESSION 2 /* server -> client: reset, try again */
#define CEPH_MSGR_TAG_WAIT 3 /* server -> client: wait for racing incoming connection */
-#define CEPH_MSGR_TAG_RETRY 4 /* server -> client + cseq: try again with higher cseq */
-#define CEPH_MSGR_TAG_CLOSE 5 /* closing pipe */
+#define CEPH_MSGR_TAG_RETRY_SESSION 4 /* server -> client + cseq: try again with higher cseq */
+#define CEPH_MSGR_TAG_RETRY_GLOBAL 5 /* server -> client + gseq: try again with higher gseq */
+#define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */
#define CEPH_MSGR_TAG_MSG 10 /* message */
#define CEPH_MSGR_TAG_ACK 11 /* message ack */
/* static tag bytes */
static char tag_ready = CEPH_MSGR_TAG_READY;
static char tag_reset = CEPH_MSGR_TAG_RESETSESSION;
-static char tag_retry = CEPH_MSGR_TAG_RETRY;
+static char tag_retry_session = CEPH_MSGR_TAG_RETRY_SESSION;
+static char tag_retry_global = CEPH_MSGR_TAG_RETRY_GLOBAL;
static char tag_wait = CEPH_MSGR_TAG_WAIT;
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
}
+static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
+{
+ u32 ret;
+ spin_lock(&msgr->global_seq_lock);
+ if (msgr->global_seq < gt)
+ msgr->global_seq = gt;
+ ret = ++msgr->global_seq;
+ spin_unlock(&msgr->global_seq_lock);
+ return ret;
+}
+
/*
* non-blocking versions
{
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
- con->out_connect_seq = cpu_to_le32(con->connect_seq);
- con->out_kvec[1].iov_base = &con->out_connect_seq;
+ con->global_seq = get_global_seq(msgr, 0);
+ con->out_global_seq = cpu_to_le32(con->global_seq);
+ con->out_kvec[1].iov_base = &con->out_global_seq;
con->out_kvec[1].iov_len = 4;
- con->out_kvec_left = 2;
- con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4;
+ con->out_connect_seq = cpu_to_le32(con->connect_seq);
+ con->out_kvec[2].iov_base = &con->out_connect_seq;
+ con->out_kvec[2].iov_len = 4;
+ con->out_kvec_left = 3;
+ con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8;
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
struct ceph_connection *con)
{
con->out_connect_seq = cpu_to_le32(con->connect_seq);
- con->out_kvec[0].iov_base = &con->out_connect_seq;
+ con->out_kvec[0].iov_base = &con->out_global_seq;
con->out_kvec[0].iov_len = 4;
- con->out_kvec_left = 1;
- con->out_kvec_bytes = 4;
+ con->out_kvec[1].iov_base = &con->out_connect_seq;
+ con->out_kvec[1].iov_len = 4;
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = 8;
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag)
+static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
+ u32 *pseq)
{
con->out_kvec[0].iov_base = ptag;
con->out_kvec[0].iov_len = 1;
- con->out_connect_seq = cpu_to_le32(con->connect_seq);
- con->out_kvec[1].iov_base = &con->out_connect_seq;
+ con->out_kvec[1].iov_base = pseq;
con->out_kvec[1].iov_len = 4;
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + 4;
con->in_base_pos += ret;
}
- if (con->in_tag == CEPH_MSGR_TAG_RETRY) {
+ if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
/* peers connect_seq */
to += sizeof(con->in_connect_seq);
if (con->in_base_pos < to) {
con->in_base_pos += ret;
}
}
+ if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+ /* peers global_seq */
+ to += sizeof(con->in_global_seq);
+ if (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(con->in_global_seq) - left;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_global_seq +
+ have, left);
+ if (ret <= 0)
+ goto out;
+ con->in_base_pos += ret;
+ }
+ }
ret = 1;
out:
dout(20, "read_connect_partial %p end at %d ret %d\n", con,
prepare_read_connect(con);
con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
break;
- case CEPH_MSGR_TAG_RETRY:
+ case CEPH_MSGR_TAG_RETRY_SESSION:
dout(10,
"process_connect got RETRY my seq = %u, peer_seq = %u\n",
le32_to_cpu(con->out_connect_seq),
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
break;
+ case CEPH_MSGR_TAG_RETRY_GLOBAL:
+ dout(10,
+ "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
+ con->global_seq, le32_to_cpu(con->in_global_seq));
+ con->global_seq =
+ get_global_seq(con->msgr,
+ le32_to_cpu(con->in_global_seq));
+ prepare_write_connect_retry(con->msgr, con);
+ prepare_read_connect(con);
+ break;
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
set_bit(WAIT, &con->state);
con->in_base_pos += ret;
}
+ /* global_seq */
+ to += sizeof(con->in_global_seq);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(u32) - left;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_global_seq + have,
+ left);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
+ }
+
/* connect_seq */
to += sizeof(con->in_connect_seq);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
- int have = sizeof(con->peer_addr) - left;
+ int have = sizeof(u32) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_connect_seq + have,
left);
{
struct ceph_connection *existing;
struct ceph_messenger *msgr = con->msgr;
- __u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
+ u32 peer_gseq = le32_to_cpu(con->in_global_seq);
+ u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
/* do we have an existing connection for this peer? */
if (radix_tree_preload(GFP_NOFS) < 0) {
spin_lock(&msgr->con_lock);
existing = __get_connection(msgr, &con->peer_addr);
if (existing) {
- if (test_bit(LOSSY, &existing->state)) {
+ if (peer_gseq < existing->global_seq) {
+ /* retry_global */
+ con->global_seq = existing->global_seq;
+ con->out_global_seq =
+ cpu_to_le32(con->global_seq);
+ prepare_write_accept_retry(con,
+ &tag_retry_global,
+ &con->out_global_seq);
+ } else if (test_bit(LOSSY, &existing->state)) {
dout(20, "process_accept replacing existing LOSSY %p\n",
existing);
reset_connection(existing);
/* old attempt or peer didn't get the READY */
/* send retry with peers connect seq */
con->connect_seq = existing->connect_seq;
- prepare_write_accept_retry(con, &tag_retry);
+ con->out_connect_seq =
+ cpu_to_le32(con->connect_seq);
+ prepare_write_accept_retry(con,
+ &tag_retry_session,
+ &con->out_connect_seq);
}
} else if (peer_cseq == existing->connect_seq &&
(test_bit(CONNECTING, &existing->state) ||
} else {
dout(20, "process_accept no existing connection, opening\n");
__register_connection(msgr, con);
+ con->global_seq = peer_gseq;
con->connect_seq = peer_cseq + 1;
prepare_write_accept_reply(con, &tag_ready);
}
INIT_LIST_HEAD(&msgr->con_all);
INIT_LIST_HEAD(&msgr->con_accepting);
INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC);
+ spin_lock_init(&msgr->global_seq_lock);
msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
if (!msgr->zero_page) {
struct list_head con_accepting; /* accepting */
struct radix_tree_root con_tree; /* established */
struct page *zero_page;
+ u32 global_seq;
+ spinlock_t global_seq_lock;
};
struct ceph_msg {
struct ceph_entity_addr peer_addr; /* peer address */
struct ceph_entity_name peer_name; /* peer name */
- __u32 connect_seq;
+ __u32 connect_seq, global_seq;
__le32 in_connect_seq, out_connect_seq;
+ __le32 in_global_seq, out_global_seq;
__u32 out_seq; /* last message queued for send */
__u32 in_seq, in_seq_acked; /* last message received, acked */
dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
}
- __u32 peer_cseq;
+ __u32 peer_gseq, peer_cseq;
Pipe *existing = 0;
// this should roughly mirror pseudocode at
// http://ceph.newdream.net/wiki/Messaging_protocol
while (1) {
+ rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read connect peer_gseq" << dendl;
+ goto fail;
+ }
rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
if (rc < 0) {
dout(10) << "accept couldn't read connect peer_seq" << dendl;
if (rank.rank_pipe.count(peer_addr)) {
existing = rank.rank_pipe[peer_addr];
existing->lock.Lock();
+
+ if (peer_gseq < existing->peer_global_seq) {
+ dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+ << " > " << peer_gseq << ", RETRY_GLOBAL" << dendl;
+ __u32 gseq = existing->peer_global_seq; // so we can send it below..
+ existing->lock.Unlock();
+ rank.lock.Unlock();
+ char tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
+ if (tcp_write(sd, &tag, 1) < 0)
+ goto fail;
+ if (tcp_write(sd, (char*)&gseq, sizeof(gseq)) < 0)
+ goto fail;
+ continue;
+ }
if (existing->policy.is_lossy()) {
dout(-10) << "accept replacing existing (lossy) channel" << dendl;
}
if (peer_cseq < existing->connect_seq) {
- if (false &&
- /*
- * FIXME: protocol spec is flawed here. we can't
- * distinguish between a remote reset or a slow remote
- * connect race (where the remote connect arrives _after_
- * our outgoing connection gets a READY reply).
- *
- * BUT, this doesn't happen in practice, yet. the "reset"
- * case comes up in two situations:
- *
- * - mds resets connection to client. it should _never_
- * talk to that client after that, unless the client
- * initiates the connection.
- *
- * - mon restarts. it'll talk to the client. but, the client
- * doesn't need the peer_reset calback in that case. faling into the
- * RETRY case is harmless.
- *
- * blah!
- */
- peer_cseq == 0) {
+ if (peer_cseq == 0) {
dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
goto replace;
} else {
// old attempt, or we sent READY but they didn't get it.
dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " > " << peer_cseq << ", RETRY" << dendl;
- connect_seq = existing->connect_seq; // so we can send it below..
+ << " > " << peer_cseq << ", RETRY_SESSION" << dendl;
+ __u32 cseq = existing->connect_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
- char tag = CEPH_MSGR_TAG_RETRY;
+ char tag = CEPH_MSGR_TAG_RETRY_SESSION;
if (tcp_write(sd, &tag, 1) < 0)
goto fail;
- if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0)
+ if (tcp_write(sd, (char*)&cseq, sizeof(cseq)) < 0)
goto fail;
continue;
}
}
assert(peer_cseq > existing->connect_seq);
+ assert(peer_gseq > existing->peer_global_seq);
if (existing->connect_seq == 0) {
dout(10) << "accept we reset (peer sent cseq " << peer_cseq
<< ", " << existing << ".cseq = " << existing->connect_seq
rank.lock.Unlock();
connect_seq = peer_cseq + 1;
+ peer_global_seq = peer_gseq;
dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
// send READY
sd = -1;
}
__u32 cseq = connect_seq;
+ __u32 gseq = rank.get_global_seq();
lock.Unlock();
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&rank.rank_addr;
msgvec[0].iov_len = sizeof(rank.rank_addr);
- msgvec[1].iov_base = (char*)&cseq;
- msgvec[1].iov_len = sizeof(cseq);
msg.msg_iov = msgvec;
- msg.msg_iovlen = 2;
- msglen = msgvec[0].iov_len + msgvec[1].iov_len;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+ if (do_sendmsg(newsd, &msg, msglen)) {
+ dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl;
+ goto fail;
+ }
while (1) {
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = (char*)&gseq;
+ msgvec[0].iov_len = sizeof(gseq);
+ msgvec[1].iov_base = (char*)&cseq;
+ msgvec[1].iov_len = sizeof(cseq);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 2;
+ msglen = msgvec[0].iov_len + msgvec[1].iov_len;
+
+ dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl;
if (do_sendmsg(newsd, &msg, msglen)) {
- dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl;
+ dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
goto fail;
}
dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl;
dout(0) << "connect got RESETSESSION" << dendl;
was_session_reset();
+ lock.Unlock();
+ continue;
+ }
+ if (tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+ int rc = tcp_read(newsd, (char*)&gseq, sizeof(gseq));
+ if (rc < 0) {
+ dout(0) << "connect got RETRY_GLOBAL tag but couldn't read gseq" << dendl;
+ goto fail;
+ }
+ lock.Lock();
+ if (state != STATE_CONNECTING) {
+ dout(0) << "connect got RETRY_GLOBAL, but connection race or something, failing" << dendl;
+ goto stop_locked;
+ }
+ gseq = rank.get_global_seq(gseq);
+ dout(10) << "connect got RETRY_GLOBAL " << gseq << dendl;
+ lock.Unlock();
continue;
}
- if (tag == CEPH_MSGR_TAG_RETRY) {
+ if (tag == CEPH_MSGR_TAG_RETRY_SESSION) {
int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq));
if (rc < 0) {
- dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl;
+ dout(0) << "connect got RETRY_SESSION tag but couldn't read cseq" << dendl;
goto fail;
}
lock.Lock();
if (state != STATE_CONNECTING) {
- dout(0) << "connect got RETRY, but connection race or something, failing" << dendl;
+ dout(0) << "connect got RETRY_SESSION, but connection race or something, failing" << dendl;
goto stop_locked;
}
assert(cseq > connect_seq);
- dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl;
+ dout(10) << "connect got RETRY_SESSION " << connect_seq << " -> " << cseq << dendl;
connect_seq = cseq;
- }
-
- if (tag == CEPH_MSGR_TAG_RESETSESSION ||
- tag == CEPH_MSGR_TAG_RETRY) {
- // retry
lock.Unlock();
- memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&cseq;
- msgvec[0].iov_len = sizeof(cseq);
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 1;
- msglen = msgvec[0].iov_len;
continue;
}
list<Message*> sent;
Cond cond;
- __u32 connect_seq;
+ __u32 connect_seq, peer_global_seq;
__u32 out_seq;
__u32 in_seq, in_seq_acked;
sd(-1),
state(st),
reader_running(false), writer_running(false),
- connect_seq(0),
+ connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) { }
//~Pipe() { cout << "destructor on " << this << std::endl; }
set<Pipe*> pipes;
list<Pipe*> pipe_reap_queue;
-
+
+ Mutex global_seq_lock;
+ __u32 global_seq;
+
Pipe *connect_rank(const entity_addr_t& addr, const Policy& p);
const entity_addr_t &get_rank_addr() { return rank_addr; }
int start(bool nodaemon = false);
void wait();
+ __u32 get_global_seq(__u32 old=0) {
+ Mutex::Locker l(global_seq_lock);
+ if (old > global_seq)
+ global_seq = old;
+ return ++global_seq;
+ }
+
EntityMessenger *register_entity(entity_name_t addr);
void rename_entity(EntityMessenger *ms, entity_name_t newaddr);
void unregister_entity(EntityMessenger *ms);