/* socket creation */
listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
- assert(listen_sd >= 0);
+ if (listen_sd < 0) {
+ derr(0) << "accepter.bind unable to create socket: "
+ << strerror(errno) << dendl;
+ return -errno;
+ }
int on = 1;
::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
/* bind to port */
int rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
- if (rc < 0)
- derr(0) << "accepter.bind unable to bind to " << g_my_addr.ipaddr << dendl;
- assert(rc >= 0);
+ if (rc < 0) {
+ derr(0) << "accepter.bind unable to bind to " << g_my_addr.ipaddr
+ << ": " << strerror(errno) << dendl;
+ return -errno;
+ }
// what port did we get?
socklen_t llen = sizeof(listen_addr);
// listen!
rc = ::listen(listen_sd, 128);
- assert(rc >= 0);
+ if (rc < 0) {
+ derr(0) << "accepter.bind unable to listen on " << g_my_addr.ipaddr
+ << ": " << strerror(errno) << dendl;
+ return -errno;
+ }
// figure out my_addr
if (g_my_addr != entity_addr_t()) {
// my IP is... HELP!
struct hostent *myhostname = gethostbyname(hostname);
if (!myhostname) {
- derr(0) << "unable to resolve hostname '" << hostname
+ derr(0) << "accepter.bind unable to resolve hostname '" << hostname
<< "', please specify your ip with --bind x.x.x.x"
<< dendl;
- exit(0);
+ return -1;
}
// look up my hostname.
if (started) {
dout(10) << "rank.bind already started" << dendl;
lock.Unlock();
- return 0;
+ return -1;
}
dout(10) << "rank.bind" << dendl;
lock.Unlock();
Message *m = q.front();
q.pop_front();
m->set_seq(++out_seq);
- sent.push_back(m); // move to sent list
lock.Unlock();
- dout(20) << "writer sending " << m->get_seq() << " " << m << " " << *m << dendl;
+
+ dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
+
+ // encode and copy out of *m
if (m->empty_payload())
m->encode_payload();
- int rc = write_message(m);
+ bufferlist payload, data;
+ payload.claim(m->get_payload());
+ data.claim(m->get_data());
+ ceph_msg_header hdr = m->get_env();
+
+ lock.Lock();
+ sent.push_back(m); // move to sent list
+ lock.Unlock();
+
+ dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
+ int rc = write_message(m, &hdr, payload, data);
lock.Lock();
if (rc < 0) {
- derr(1) << "writer error sending " << *m << " to " << m->get_dest() << ", "
+ derr(1) << "writer error sending " << m << " to " << hdr.dst << ", "
<< errno << ": " << strerror(errno) << dendl;
fault();
}
}
-int Rank::Pipe::write_message(Message *m)
+int Rank::Pipe::write_message(Message *m, ceph_msg_header *env,
+ bufferlist &payload, bufferlist &data)
{
// get envelope, buffers
- ceph_msg_header *env = &m->get_env();
- env->front_len = cpu_to_le32(m->get_payload().length());
- env->data_len = cpu_to_le32(m->get_data().length());
+ env->front_len = cpu_to_le32(payload.length());
+ env->data_len = cpu_to_le32(data.length());
bufferlist blist;
- blist.claim( m->get_payload() );
- blist.append( m->get_data() );
+ blist.claim(payload);
+ blist.append(data);
- dout(20) << "write_message " << m << " " << *m << " to " << m->get_dest() << dendl;
+ dout(20) << "write_message " << m << " to " << env->dst << dendl;
// set up msghdr and iovecs
struct msghdr msg;