struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
int len;
struct socket *sock;
-
+
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ret;
atomic_inc(&con->nref);
dout(40, "ceph_queue_con %p %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
-
+
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
!queue_work(ceph_msgr_wq, &con->work.work)) {
}
con_close_socket(con);
-
+
/* hmm? */
BUG_ON(test_bit(WAIT, &con->state));
-
+
/*
* If there are no messages in the queue, place the
* connection in a STANDBY state. otherwise, retry with
spin_unlock(&con->out_queue_lock);
return;
}
-
+
dout(10, "fault setting BACKOFF\n");
set_bit(BACKOFF, &con->state);
con->delay);
queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay));
-
+
list_splice_init(&con->out_sent, &con->out_queue);
spin_unlock(&con->out_queue_lock);
}
con->out_footer.aborted = cpu_to_le32(con->out_msg->pages == 0);
con->out_kvec[0].iov_base = &con->out_footer;
- con->out_kvec_bytes = con->out_kvec[0].iov_len =
+ con->out_kvec_bytes = con->out_kvec[0].iov_len =
sizeof(con->out_footer);
con->out_kvec_left = 1;
con->out_kvec_cur = con->out_kvec;
con->out_msg = 0;
con->out_more = 0; /* end of message */
-
+
ret = 1;
out:
return ret;
*/
static void prepare_write_ack(struct ceph_connection *con)
{
- dout(20, "prepare_write_ack %p %u -> %u\n", con,
+ dout(20, "prepare_write_ack %p %u -> %u\n", con,
con->in_seq_acked, con->in_seq);
con->in_seq_acked = con->in_seq;
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
- con->out_global_seq = cpu_to_le32(con->global_seq);
- con->out_connect_seq = cpu_to_le32(con->connect_seq);
-
- con->out_kvec[0].iov_base = &msgr->inst.addr;
- con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
- con->out_kvec[1].iov_base = &con->out_global_seq;
- con->out_kvec[1].iov_len = 4;
- con->out_kvec[2].iov_base = &con->out_connect_seq;
- con->out_kvec[2].iov_len = 4;
+ con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
+ con->out_connect.global_seq = cpu_to_le32(con->global_seq);
+
+ con->out_kvec[0].iov_base = CEPH_BANNER;
+ con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
+ con->out_kvec[1].iov_base = &msgr->inst.addr;
+ con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
+ con->out_kvec[2].iov_base = &con->out_connect;
+ con->out_kvec[2].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 3;
- con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8;
+ con->out_kvec_bytes = strlen(CEPH_BANNER) +
+ sizeof(msgr->inst.addr) +
+ sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
static void prepare_write_connect_retry(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
- con->out_global_seq = cpu_to_le32(con->global_seq);
- con->out_connect_seq = cpu_to_le32(con->connect_seq);
+ con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
+ con->out_connect.global_seq = cpu_to_le32(con->global_seq);
- con->out_kvec[0].iov_base = &con->out_global_seq;
- con->out_kvec[0].iov_len = 4;
- con->out_kvec[1].iov_base = &con->out_connect_seq;
- con->out_kvec[1].iov_len = 4;
- con->out_kvec_left = 2;
- con->out_kvec_bytes = 8;
+ con->out_kvec[0].iov_base = &con->out_connect;
+ con->out_kvec[0].iov_len = sizeof(con->out_connect);
+ con->out_kvec_left = 1;
+ con->out_kvec_bytes = sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
-static void prepare_write_accept_announce(struct ceph_messenger *msgr,
- struct ceph_connection *con)
+static void prepare_write_accept_hello(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
{
- con->out_kvec[0].iov_base = &msgr->inst.addr;
- con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
- con->out_kvec_left = 1;
- con->out_kvec_bytes = sizeof(msgr->inst.addr);
+ int len = strlen(CEPH_BANNER);
+
+ con->out_kvec[0].iov_base = CEPH_BANNER;
+ con->out_kvec[0].iov_len = len;
+ con->out_kvec[1].iov_base = &msgr->inst.addr;
+ con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = len + sizeof(msgr->inst.addr);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
+
+ /* we'll re-read the connect request, but not the hello */
+ con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr);
}
/*
/* if first message, set peer_name */
if (con->peer_name.type == 0)
con->peer_name = con->in_msg->hdr.src.name;
-
+
spin_lock(&con->out_queue_lock);
con->in_seq++;
spin_unlock(&con->out_queue_lock);
-
+
dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
ENTITY_NAME(con->in_msg->hdr.src.name),
int ret, to;
dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
- /* actual_peer_addr */
- to = sizeof(con->actual_peer_addr);
+ /* peer's banner */
+ to = strlen(CEPH_BANNER);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_banner + have,
+ left);
+ if (ret <= 0)
+ goto out;
+ con->in_base_pos += ret;
+ }
+
+ /* peer's addr */
+ to += sizeof(con->actual_peer_addr);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(con->actual_peer_addr) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->actual_peer_addr + have,
left);
}
if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
- /* peers connect_seq */
- to += sizeof(con->in_connect_seq);
+ /* peer's connect_seq */
+ to += sizeof(con->in_connect.connect_seq);
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
- int have = sizeof(con->in_connect_seq) - left;
+ int have = sizeof(con->in_connect.connect_seq) - left;
ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_connect_seq +
- have, left);
+ (char *)&con->in_connect.connect_seq +
+ have, left);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
}
}
if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
- /* peers global_seq */
- to += sizeof(con->in_global_seq);
+ /* peer's global_seq */
+ to += sizeof(con->in_connect.global_seq);
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
- int have = sizeof(con->in_global_seq) - left;
+ int have = sizeof(con->in_connect.global_seq) - left;
ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_global_seq +
- have, left);
+ (char *)&con->in_connect.global_seq +
+ have, left);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
out:
dout(20, "read_connect_partial %p end at %d ret %d\n", con,
con->in_base_pos, ret);
- dout(20, "read_connect_partial peer in connect_seq = %u\n",
- le32_to_cpu(con->in_connect_seq));
+ dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n",
+ le32_to_cpu(con->in_connect.connect_seq),
+ le32_to_cpu(con->in_connect.global_seq));
return ret; /* done */
}
*/
static void reset_connection(struct ceph_connection *con)
{
- derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n",
+ derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n",
ENTITY_NAME(con->peer_name),
IPQUADPORT(con->peer_addr.ipaddr));
spin_unlock(&con->out_queue_lock);
}
+static int verify_hello(struct ceph_connection *con)
+{
+ if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n",
+ IPQUADPORT(con->peer_addr.ipaddr));
+ con->error_msg = "protocol error, bad banner";
+ return -1;
+ }
+ return 0;
+}
+
static int process_connect(struct ceph_connection *con)
{
dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
+
+ if (verify_hello(con) < 0)
+ return -1;
+
+ /* verify peer addr */
if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr) &&
con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) {
derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, "
con->error_msg = "protocol error, wrong peer";
return -1;
}
+
switch (con->in_tag) {
case CEPH_MSGR_TAG_RESETSESSION:
dout(10, "process_connect got RESET peer seq %u\n",
- le32_to_cpu(con->in_connect_seq));
+ le32_to_cpu(con->in_connect.connect_seq));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
case CEPH_MSGR_TAG_RETRY_SESSION:
dout(10,
"process_connect got RETRY my seq = %u, peer_seq = %u\n",
- le32_to_cpu(con->out_connect_seq),
- le32_to_cpu(con->in_connect_seq));
- con->connect_seq = le32_to_cpu(con->in_connect_seq);
+ le32_to_cpu(con->out_connect.connect_seq),
+ le32_to_cpu(con->in_connect.connect_seq));
+ con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_RETRY_GLOBAL:
dout(10,
"process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
- con->global_seq, le32_to_cpu(con->in_global_seq));
+ con->global_seq, le32_to_cpu(con->in_connect.global_seq));
con->global_seq =
get_global_seq(con->msgr,
- le32_to_cpu(con->in_global_seq));
+ le32_to_cpu(con->in_connect.global_seq));
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
break;
int ret;
int to;
- /* peer addr */
- to = sizeof(con->peer_addr);
+ /* banner */
+ to = strlen(CEPH_BANNER);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->peer_addr + have, left);
+ (char *)&con->in_banner + have, left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
}
- /* global_seq */
- to += sizeof(con->in_global_seq);
+ /* peer_addr */
+ to += sizeof(con->peer_addr);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
- int have = sizeof(u32) - left;
+ int have = sizeof(con->peer_addr) - left;
ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_global_seq + have,
+ (char *)&con->peer_addr + have,
left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
}
- /* connect_seq */
- to += sizeof(con->in_connect_seq);
- while (con->in_base_pos < to) {
- int left = to - con->in_base_pos;
- int have = sizeof(u32) - left;
- ret = ceph_tcp_recvmsg(con->sock,
- (char *)&con->in_connect_seq + have,
- left);
- if (ret <= 0)
- return ret;
- con->in_base_pos += ret;
+ /* connect */
+ to += sizeof(con->in_connect);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(u32) - left;
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_connect + have, left);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
}
+
+
return 1; /* done */
}
list_splice_init(&new->out_queue, &old->out_queue);
spin_unlock(&old->out_queue_lock);
- new->connect_seq = le32_to_cpu(new->in_connect_seq);
+ new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
new->out_seq = old->out_seq;
/* replace list entry */
/*
* call after a new connection's handshake has completed
*/
-static void process_accept(struct ceph_connection *con)
+static int process_accept(struct ceph_connection *con)
{
struct ceph_connection *existing;
struct ceph_messenger *msgr = con->msgr;
- u32 peer_gseq = le32_to_cpu(con->in_global_seq);
- u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
+ u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
+ u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
+ if (verify_hello(con) < 0)
+ return -1;
+
+ /* connect */
/* do we have an existing connection for this peer? */
if (radix_tree_preload(GFP_NOFS) < 0) {
derr(10, "ENOMEM in process_accept\n");
- return;
+ con->error_msg = "out of memory";
+ return -1;
}
spin_lock(&msgr->con_lock);
existing = __get_connection(msgr, &con->peer_addr);
if (peer_gseq < existing->global_seq) {
/* retry_global */
con->global_seq = existing->global_seq;
- con->out_global_seq =
+ con->out_connect.global_seq =
cpu_to_le32(con->global_seq);
prepare_write_accept_retry(con,
- &tag_retry_global,
- &con->out_global_seq);
+ &tag_retry_global,
+ &con->out_connect.global_seq);
} else if (test_bit(LOSSY, &existing->state)) {
dout(20, "process_accept replacing existing LOSSY %p\n",
existing);
/* old attempt or peer didn't get the READY */
/* send retry with peers connect seq */
con->connect_seq = existing->connect_seq;
- con->out_connect_seq =
+ con->out_connect.connect_seq =
cpu_to_le32(con->connect_seq);
prepare_write_accept_retry(con,
&tag_retry_session,
- &con->out_connect_seq);
+ &con->out_connect.connect_seq);
}
} else if (peer_cseq == existing->connect_seq &&
(test_bit(CONNECTING, &existing->state) ||
ceph_queue_con(con);
put_connection(con);
+ return 0;
}
ret = read_accept_partial(con);
if (ret <= 0)
goto done;
- process_accept(con); /* accepted */
+ if (process_accept(con) < 0) {
+ ret = -1;
+ goto out;
+ }
goto more;
}
if (test_bit(CONNECTING, &con->state)) {
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
-
+
more:
if (test_and_set_bit(BUSY, &con->state) != 0) {
dout(10, "con_work %p BUSY already set\n", con);
}
dout(10, "con_work %p start, clearing QUEUED\n", con);
clear_bit(QUEUED, &con->state);
-
+
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout(5, "con_work CLOSED\n");
goto done;
}
if (test_and_clear_bit(BACKOFF, &con->state))
dout(5, "con_work cleared BACKOFF\n");
-
+
if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
try_read(con) < 0 ||
try_write(con) < 0)
}
dout(5, "accepted connection \n");
- prepare_write_accept_announce(msgr, newcon);
+ prepare_write_accept_hello(msgr, newcon);
add_connection_accepting(msgr, newcon);
/* hand off to work queue; we may have missed socket state change */
struct ceph_connection *con;
dout(2, "destroy %p\n", msgr);
-
+
/* stop listener */
msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR);
sock_release(msgr->listen_sock);
dout(40, " get %p %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
__remove_connection(msgr, con);
-
+
/* in case there's queued work... */
spin_unlock(&msgr->con_lock);
if (cancel_delayed_work_sync(&con->work))
if (atomic_read(&old->nref) == 1)
return old; /* we have only ref, all is well */
-
- dup = ceph_msg_new(le32_to_cpu(old->hdr.type),
+
+ dup = ceph_msg_new(le32_to_cpu(old->hdr.type),
le32_to_cpu(old->hdr.front_len),
- le32_to_cpu(old->hdr.data_len),
+ le32_to_cpu(old->hdr.data_len),
le32_to_cpu(old->hdr.data_off),
old->pages);
BUG_ON(!dup);
memcpy(dup->front.iov_base, old->front.iov_base,
le32_to_cpu(old->hdr.front_len));
-
+
/* revoke old message's pages */
mutex_lock(&old->page_mutex);
old->pages = 0;
derr(10, "ENOMEM in ceph_msg_send\n");
return ret;
}
-
+
spin_lock(&msgr->con_lock);
con = __get_connection(msgr, &msg->hdr.dst.addr);
if (con) {
assert(state == STATE_ACCEPTING);
// announce myself.
- int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
+ int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(10) << "accept couldn't write banner" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+
+ // and my addr
+ rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
if (rc < 0) {
dout(10) << "accept couldn't write my addr" << dendl;
state = STATE_CLOSED;
dout(10) << "accept sd=" << sd << dendl;
// identify peer
+ char banner[strlen(CEPH_BANNER)];
+ rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read peer_addr" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ dout(10) << "accept peer sent bad banner" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
if (rc < 0) {
- dout(10) << "accept couldn't read peer addr" << dendl;
+ dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
return -1;
}
dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
}
- __u32 peer_gseq, peer_cseq;
+ ceph_msg_connect connect;
Pipe *existing = 0;
// this should roughly mirror pseudocode at
// http://ceph.newdream.net/wiki/Messaging_protocol
while (1) {
- rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq));
+ rc = tcp_read(sd, (char*)&connect, sizeof(connect));
if (rc < 0) {
- dout(10) << "accept couldn't read connect peer_gseq" << dendl;
+ dout(10) << "accept couldn't read connect" << dendl;
goto fail;
}
- rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
- if (rc < 0) {
- dout(10) << "accept couldn't read connect peer_seq" << dendl;
- goto fail;
- }
- dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl;
+ dout(20) << "accept got peer_connect_seq " << connect.connect_seq << dendl;
rank.lock.Lock();
existing = rank.rank_pipe[peer_addr];
existing->lock.Lock();
- if (peer_gseq < existing->peer_global_seq) {
+ if (connect.global_seq < existing->peer_global_seq) {
dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
- << " > " << peer_gseq << ", RETRY_GLOBAL" << dendl;
- __u32 gseq = existing->peer_global_seq; // so we can send it below..
+ << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
+ __le32 gseq;
+ gseq = existing->peer_global_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
goto replace;
}
- if (peer_cseq < existing->connect_seq) {
- if (peer_cseq == 0) {
+ if (connect.connect_seq < existing->connect_seq) {
+ if (connect.connect_seq == 0) {
dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
goto replace;
} else {
// old attempt, or we sent READY but they didn't get it.
dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " > " << peer_cseq << ", RETRY_SESSION" << dendl;
- __u32 cseq = existing->connect_seq; // so we can send it below..
+ << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
+ __le32 cseq;
+ cseq = existing->connect_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RETRY_SESSION;
}
}
- if (peer_cseq == existing->connect_seq) {
+ if (connect.connect_seq == existing->connect_seq) {
// connection race
if (peer_addr < rank.rank_addr) {
// incoming wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
- << " == " << peer_cseq << ", replacing my attempt" << dendl;
+ << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
assert(existing->state == STATE_CONNECTING ||
existing->state == STATE_WAIT);
goto replace;
} else {
// our existing outgoing wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
- << " == " << peer_cseq << ", sending WAIT" << dendl;
+ << " == " << connect.connect_seq << ", sending WAIT" << dendl;
assert(peer_addr > rank.rank_addr);
assert(existing->state == STATE_CONNECTING); // this will win
existing->lock.Unlock();
}
}
- assert(peer_cseq > existing->connect_seq);
- assert(peer_gseq >= existing->peer_global_seq);
+ assert(connect.connect_seq > existing->connect_seq);
+ assert(connect.global_seq >= existing->peer_global_seq);
if (existing->connect_seq == 0) {
- dout(0) << "accept we reset (peer sent cseq " << peer_cseq
+ dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq
<< ", " << existing << ".cseq = " << existing->connect_seq
<< "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
continue;
} else {
// reconnect
- dout(10) << "accept peer sent cseq " << peer_cseq
+ dout(10) << "accept peer sent cseq " << connect.connect_seq
<< " > " << existing->connect_seq << dendl;
goto replace;
}
assert(0);
} // existing
- else if (peer_cseq > 0) {
+ else if (connect.connect_seq > 0) {
// we reset, and they are opening a new session
- dout(0) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
+ dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RESETSESSION;
if (tcp_write(sd, &tag, 1) < 0)
register_pipe();
rank.lock.Unlock();
- connect_seq = peer_cseq + 1;
- peer_global_seq = peer_gseq;
+ connect_seq = connect.connect_seq + 1;
+ peer_global_seq = connect.global_seq;
dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
// send READY
char tag = -1;
int rc;
struct sockaddr_in myAddr;
- entity_addr_t paddr;
struct msghdr msg;
struct iovec msgvec[2];
int msglen;
-
+ char banner[strlen(CEPH_BANNER)];
+ entity_addr_t paddr;
+
// create socket?
newsd = ::socket(AF_INET, SOCK_STREAM, 0);
if (newsd < 0) {
dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
}
+ // verify banner
+ // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
+ rc = tcp_read(newsd, (char*)&banner, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
+ goto fail;
+ }
+
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = banner;
+ msgvec[0].iov_len = strlen(banner);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+ if (do_sendmsg(newsd, &msg, msglen)) {
+ dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
+ goto fail;
+ }
+
// identify peer
rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr));
if (rc < 0) {
goto fail;
}
}
-
+
// identify myself, and send initial cseq
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&rank.rank_addr;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
if (do_sendmsg(newsd, &msg, msglen)) {
- dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl;
+ dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
goto fail;
}
while (1) {
+ ceph_msg_connect connect;
+ connect.global_seq = gseq;
+ connect.connect_seq = cseq;
memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&gseq;
- msgvec[0].iov_len = sizeof(gseq);
- msgvec[1].iov_base = (char*)&cseq;
- msgvec[1].iov_len = sizeof(cseq);
+ msgvec[0].iov_base = (char*)&connect;
+ msgvec[0].iov_len = sizeof(connect);
msg.msg_iov = msgvec;
- msg.msg_iovlen = 2;
- msglen = msgvec[0].iov_len + msgvec[1].iov_len;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl;
if (do_sendmsg(newsd, &msg, msglen)) {
dout(10) << "write_ack " << seq << dendl;
char c = CEPH_MSGR_TAG_ACK;
- __u32 s = seq;/*cpu_to_le32(seq);*/
+ __le32 s;
+ s = seq;
struct msghdr msg;
memset(&msg, 0, sizeof(msg));