From 787a6780f00f2ee2b63e202fd689d2adfe5e978d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 30 Jun 2008 22:23:19 -0700 Subject: [PATCH] msgr: add global_seq to disambiguate reset and slow connect --- src/include/ceph_fs.h | 5 +- src/kernel/messenger.c | 104 ++++++++++++++++++++++++++++------ src/kernel/messenger.h | 5 +- src/msg/SimpleMessenger.cc | 112 ++++++++++++++++++++++--------------- src/msg/SimpleMessenger.h | 16 +++++- 5 files changed, 173 insertions(+), 69 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 9d1c5d84ccf6..d1149aff1dc2 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -336,8 +336,9 @@ struct ceph_entity_name { #define CEPH_MSGR_TAG_READY 1 /* server -> client: ready for messages */ #define CEPH_MSGR_TAG_RESETSESSION 2 /* server -> client: reset, try again */ #define CEPH_MSGR_TAG_WAIT 3 /* server -> client: wait for racing incoming connection */ -#define CEPH_MSGR_TAG_RETRY 4 /* server -> client + cseq: try again with higher cseq */ -#define CEPH_MSGR_TAG_CLOSE 5 /* closing pipe */ +#define CEPH_MSGR_TAG_RETRY_SESSION 4 /* server -> client + cseq: try again with higher cseq */ +#define CEPH_MSGR_TAG_RETRY_GLOBAL 5 /* server -> client + gseq: try again with higher gseq */ +#define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */ #define CEPH_MSGR_TAG_MSG 10 /* message */ #define CEPH_MSGR_TAG_ACK 11 /* message ack */ diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index c81798882572..9f3c61fc5857 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -16,7 +16,8 @@ int ceph_debug_msgr; /* static tag bytes */ static char tag_ready = CEPH_MSGR_TAG_READY; static char tag_reset = CEPH_MSGR_TAG_RESETSESSION; -static char tag_retry = CEPH_MSGR_TAG_RETRY; +static char tag_retry_session = CEPH_MSGR_TAG_RETRY_SESSION; +static char tag_retry_global = CEPH_MSGR_TAG_RETRY_GLOBAL; static char tag_wait = CEPH_MSGR_TAG_WAIT; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -614,6 +615,17 @@ static void ceph_fault(struct ceph_connection *con) } +static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) +{ + u32 ret; + spin_lock(&msgr->global_seq_lock); + if (msgr->global_seq < gt) + msgr->global_seq = gt; + ret = ++msgr->global_seq; + spin_unlock(&msgr->global_seq_lock); + return ret; +} + /* * non-blocking versions @@ -804,11 +816,15 @@ static void prepare_write_connect(struct ceph_messenger *msgr, { con->out_kvec[0].iov_base = &msgr->inst.addr; con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); - con->out_connect_seq = cpu_to_le32(con->connect_seq); - con->out_kvec[1].iov_base = &con->out_connect_seq; + con->global_seq = get_global_seq(msgr, 0); + con->out_global_seq = cpu_to_le32(con->global_seq); + con->out_kvec[1].iov_base = &con->out_global_seq; con->out_kvec[1].iov_len = 4; - con->out_kvec_left = 2; - con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4; + con->out_connect_seq = cpu_to_le32(con->connect_seq); + con->out_kvec[2].iov_base = &con->out_connect_seq; + con->out_kvec[2].iov_len = 4; + con->out_kvec_left = 3; + con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8; con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); @@ -818,10 +834,12 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr, struct ceph_connection *con) { con->out_connect_seq = cpu_to_le32(con->connect_seq); - con->out_kvec[0].iov_base = &con->out_connect_seq; + con->out_kvec[0].iov_base = &con->out_global_seq; con->out_kvec[0].iov_len = 4; - con->out_kvec_left = 1; - con->out_kvec_bytes = 4; + con->out_kvec[1].iov_base = &con->out_connect_seq; + con->out_kvec[1].iov_len = 4; + con->out_kvec_left = 2; + con->out_kvec_bytes = 8; con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); @@ -850,12 +868,12 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) set_bit(WRITE_PENDING, &con->state); } -static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag) +static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag, + u32 *pseq) { con->out_kvec[0].iov_base = ptag; con->out_kvec[0].iov_len = 1; - con->out_connect_seq = cpu_to_le32(con->connect_seq); - con->out_kvec[1].iov_base = &con->out_connect_seq; + con->out_kvec[1].iov_base = pseq; con->out_kvec[1].iov_len = 4; con->out_kvec_left = 2; con->out_kvec_bytes = 1 + 4; @@ -1190,7 +1208,7 @@ static int read_connect_partial(struct ceph_connection *con) con->in_base_pos += ret; } - if (con->in_tag == CEPH_MSGR_TAG_RETRY) { + if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) { /* peers connect_seq */ to += sizeof(con->in_connect_seq); if (con->in_base_pos < to) { @@ -1204,6 +1222,20 @@ static int read_connect_partial(struct ceph_connection *con) con->in_base_pos += ret; } } + if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + /* peers global_seq */ + to += sizeof(con->in_global_seq); + if (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(con->in_global_seq) - left; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_global_seq + + have, left); + if (ret <= 0) + goto out; + con->in_base_pos += ret; + } + } ret = 1; out: dout(20, "read_connect_partial %p end at %d ret %d\n", con, @@ -1259,7 +1291,7 @@ static int process_connect(struct ceph_connection *con) prepare_read_connect(con); con->msgr->peer_reset(con->msgr->parent, &con->peer_name); break; - case CEPH_MSGR_TAG_RETRY: + case CEPH_MSGR_TAG_RETRY_SESSION: dout(10, "process_connect got RETRY my seq = %u, peer_seq = %u\n", le32_to_cpu(con->out_connect_seq), @@ -1268,6 +1300,16 @@ static int process_connect(struct ceph_connection *con) prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); break; + case CEPH_MSGR_TAG_RETRY_GLOBAL: + dout(10, + "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n", + con->global_seq, le32_to_cpu(con->in_global_seq)); + con->global_seq = + get_global_seq(con->msgr, + le32_to_cpu(con->in_global_seq)); + prepare_write_connect_retry(con->msgr, con); + prepare_read_connect(con); + break; case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); set_bit(WAIT, &con->state); @@ -1307,11 +1349,24 @@ static int read_accept_partial(struct ceph_connection *con) con->in_base_pos += ret; } + /* global_seq */ + to += sizeof(con->in_global_seq); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(u32) - left; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_global_seq + have, + left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; + } + /* connect_seq */ to += sizeof(con->in_connect_seq); while (con->in_base_pos < to) { int left = to - con->in_base_pos; - int have = sizeof(con->peer_addr) - left; + int have = sizeof(u32) - left; ret = ceph_tcp_recvmsg(con->sock, (char *)&con->in_connect_seq + have, left); @@ -1358,7 +1413,8 @@ static void process_accept(struct ceph_connection *con) { struct ceph_connection *existing; struct ceph_messenger *msgr = con->msgr; - __u32 peer_cseq = le32_to_cpu(con->in_connect_seq); + u32 peer_gseq = le32_to_cpu(con->in_global_seq); + u32 peer_cseq = le32_to_cpu(con->in_connect_seq); /* do we have an existing connection for this peer? */ if (radix_tree_preload(GFP_NOFS) < 0) { @@ -1368,7 +1424,15 @@ static void process_accept(struct ceph_connection *con) spin_lock(&msgr->con_lock); existing = __get_connection(msgr, &con->peer_addr); if (existing) { - if (test_bit(LOSSY, &existing->state)) { + if (peer_gseq < existing->global_seq) { + /* retry_global */ + con->global_seq = existing->global_seq; + con->out_global_seq = + cpu_to_le32(con->global_seq); + prepare_write_accept_retry(con, + &tag_retry_global, + &con->out_global_seq); + } else if (test_bit(LOSSY, &existing->state)) { dout(20, "process_accept replacing existing LOSSY %p\n", existing); reset_connection(existing); @@ -1385,7 +1449,11 @@ static void process_accept(struct ceph_connection *con) /* old attempt or peer didn't get the READY */ /* send retry with peers connect seq */ con->connect_seq = existing->connect_seq; - prepare_write_accept_retry(con, &tag_retry); + con->out_connect_seq = + cpu_to_le32(con->connect_seq); + prepare_write_accept_retry(con, + &tag_retry_session, + &con->out_connect_seq); } } else if (peer_cseq == existing->connect_seq && (test_bit(CONNECTING, &existing->state) || @@ -1419,6 +1487,7 @@ static void process_accept(struct ceph_connection *con) } else { dout(20, "process_accept no existing connection, opening\n"); __register_connection(msgr, con); + con->global_seq = peer_gseq; con->connect_seq = peer_cseq + 1; prepare_write_accept_reply(con, &tag_ready); } @@ -1629,6 +1698,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) INIT_LIST_HEAD(&msgr->con_all); INIT_LIST_HEAD(&msgr->con_accepting); INIT_RADIX_TREE(&msgr->con_tree, GFP_ATOMIC); + spin_lock_init(&msgr->global_seq_lock); msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); if (!msgr->zero_page) { diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 3ccc80eaa6ca..33c4a250a3b8 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -51,6 +51,8 @@ struct ceph_messenger { struct list_head con_accepting; /* accepting */ struct radix_tree_root con_tree; /* established */ struct page *zero_page; + u32 global_seq; + spinlock_t global_seq_lock; }; struct ceph_msg { @@ -101,8 +103,9 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ - __u32 connect_seq; + __u32 connect_seq, global_seq; __le32 in_connect_seq, out_connect_seq; + __le32 in_global_seq, out_global_seq; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index ac73f74a0c1a..a2e218ed986e 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -770,13 +770,18 @@ int Rank::Pipe::accept() dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl; } - __u32 peer_cseq; + __u32 peer_gseq, peer_cseq; Pipe *existing = 0; // this should roughly mirror pseudocode at // http://ceph.newdream.net/wiki/Messaging_protocol while (1) { + rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq)); + if (rc < 0) { + dout(10) << "accept couldn't read connect peer_gseq" << dendl; + goto fail; + } rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq)); if (rc < 0) { dout(10) << "accept couldn't read connect peer_seq" << dendl; @@ -790,6 +795,20 @@ int Rank::Pipe::accept() if (rank.rank_pipe.count(peer_addr)) { existing = rank.rank_pipe[peer_addr]; existing->lock.Lock(); + + if (peer_gseq < existing->peer_global_seq) { + dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + << " > " << peer_gseq << ", RETRY_GLOBAL" << dendl; + __u32 gseq = existing->peer_global_seq; // so we can send it below.. + existing->lock.Unlock(); + rank.lock.Unlock(); + char tag = CEPH_MSGR_TAG_RETRY_GLOBAL; + if (tcp_write(sd, &tag, 1) < 0) + goto fail; + if (tcp_write(sd, (char*)&gseq, sizeof(gseq)) < 0) + goto fail; + continue; + } if (existing->policy.is_lossy()) { dout(-10) << "accept replacing existing (lossy) channel" << dendl; @@ -798,41 +817,21 @@ int Rank::Pipe::accept() } if (peer_cseq < existing->connect_seq) { - if (false && - /* - * FIXME: protocol spec is flawed here. we can't - * distinguish between a remote reset or a slow remote - * connect race (where the remote connect arrives _after_ - * our outgoing connection gets a READY reply). - * - * BUT, this doesn't happen in practice, yet. the "reset" - * case comes up in two situations: - * - * - mds resets connection to client. it should _never_ - * talk to that client after that, unless the client - * initiates the connection. - * - * - mon restarts. it'll talk to the client. but, the client - * doesn't need the peer_reset calback in that case. faling into the - * RETRY case is harmless. - * - * blah! - */ - peer_cseq == 0) { + if (peer_cseq == 0) { dout(10) << "accept peer reset, then tried to connect to us, replacing" << dendl; existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s goto replace; } else { // old attempt, or we sent READY but they didn't get it. dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " > " << peer_cseq << ", RETRY" << dendl; - connect_seq = existing->connect_seq; // so we can send it below.. + << " > " << peer_cseq << ", RETRY_SESSION" << dendl; + __u32 cseq = existing->connect_seq; // so we can send it below.. existing->lock.Unlock(); rank.lock.Unlock(); - char tag = CEPH_MSGR_TAG_RETRY; + char tag = CEPH_MSGR_TAG_RETRY_SESSION; if (tcp_write(sd, &tag, 1) < 0) goto fail; - if (tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) + if (tcp_write(sd, (char*)&cseq, sizeof(cseq)) < 0) goto fail; continue; } @@ -864,6 +863,7 @@ int Rank::Pipe::accept() } assert(peer_cseq > existing->connect_seq); + assert(peer_gseq > existing->peer_global_seq); if (existing->connect_seq == 0) { dout(10) << "accept we reset (peer sent cseq " << peer_cseq << ", " << existing << ".cseq = " << existing->connect_seq @@ -921,6 +921,7 @@ int Rank::Pipe::accept() rank.lock.Unlock(); connect_seq = peer_cseq + 1; + peer_global_seq = peer_gseq; dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; // send READY @@ -954,6 +955,7 @@ int Rank::Pipe::connect() sd = -1; } __u32 cseq = connect_seq; + __u32 gseq = rank.get_global_seq(); lock.Unlock(); @@ -1021,15 +1023,27 @@ int Rank::Pipe::connect() memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = (char*)&rank.rank_addr; msgvec[0].iov_len = sizeof(rank.rank_addr); - msgvec[1].iov_base = (char*)&cseq; - msgvec[1].iov_len = sizeof(cseq); msg.msg_iov = msgvec; - msg.msg_iovlen = 2; - msglen = msgvec[0].iov_len + msgvec[1].iov_len; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + if (do_sendmsg(newsd, &msg, msglen)) { + dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl; + goto fail; + } while (1) { + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&gseq; + msgvec[0].iov_len = sizeof(gseq); + msgvec[1].iov_base = (char*)&cseq; + msgvec[1].iov_len = sizeof(cseq); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + msglen = msgvec[0].iov_len + msgvec[1].iov_len; + + dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl; if (do_sendmsg(newsd, &msg, msglen)) { - dout(2) << "connect couldn't write self, seq, " << strerror(errno) << dendl; + dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl; goto fail; } dout(20) << "connect wrote (self +) cseq, waiting for tag" << dendl; @@ -1048,34 +1062,40 @@ int Rank::Pipe::connect() dout(0) << "connect got RESETSESSION" << dendl; was_session_reset(); + lock.Unlock(); + continue; + } + if (tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + int rc = tcp_read(newsd, (char*)&gseq, sizeof(gseq)); + if (rc < 0) { + dout(0) << "connect got RETRY_GLOBAL tag but couldn't read gseq" << dendl; + goto fail; + } + lock.Lock(); + if (state != STATE_CONNECTING) { + dout(0) << "connect got RETRY_GLOBAL, but connection race or something, failing" << dendl; + goto stop_locked; + } + gseq = rank.get_global_seq(gseq); + dout(10) << "connect got RETRY_GLOBAL " << gseq << dendl; + lock.Unlock(); continue; } - if (tag == CEPH_MSGR_TAG_RETRY) { + if (tag == CEPH_MSGR_TAG_RETRY_SESSION) { int rc = tcp_read(newsd, (char*)&cseq, sizeof(cseq)); if (rc < 0) { - dout(0) << "connect got RETRY tag but couldn't read cseq" << dendl; + dout(0) << "connect got RETRY_SESSION tag but couldn't read cseq" << dendl; goto fail; } lock.Lock(); if (state != STATE_CONNECTING) { - dout(0) << "connect got RETRY, but connection race or something, failing" << dendl; + dout(0) << "connect got RETRY_SESSION, but connection race or something, failing" << dendl; goto stop_locked; } assert(cseq > connect_seq); - dout(10) << "connect got RETRY " << connect_seq << " -> " << cseq << dendl; + dout(10) << "connect got RETRY_SESSION " << connect_seq << " -> " << cseq << dendl; connect_seq = cseq; - } - - if (tag == CEPH_MSGR_TAG_RESETSESSION || - tag == CEPH_MSGR_TAG_RETRY) { - // retry lock.Unlock(); - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&cseq; - msgvec[0].iov_len = sizeof(cseq); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; continue; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index acaf6dbd24eb..a579967f5743 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -125,7 +125,7 @@ private: list sent; Cond cond; - __u32 connect_seq; + __u32 connect_seq, peer_global_seq; __u32 out_seq; __u32 in_seq, in_seq_acked; @@ -169,7 +169,7 @@ private: sd(-1), state(st), reader_running(false), writer_running(false), - connect_seq(0), + connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { } //~Pipe() { cout << "destructor on " << this << std::endl; } @@ -343,7 +343,10 @@ private: set pipes; list pipe_reap_queue; - + + Mutex global_seq_lock; + __u32 global_seq; + Pipe *connect_rank(const entity_addr_t& addr, const Policy& p); const entity_addr_t &get_rank_addr() { return rank_addr; } @@ -363,6 +366,13 @@ public: int start(bool nodaemon = false); void wait(); + __u32 get_global_seq(__u32 old=0) { + Mutex::Locker l(global_seq_lock); + if (old > global_seq) + global_seq = old; + return ++global_seq; + } + EntityMessenger *register_entity(entity_name_t addr); void rename_entity(EntityMessenger *ms, entity_name_t newaddr); void unregister_entity(EntityMessenger *ms); -- 2.47.3