]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: fix msg destroy race
authorSage Weil <sage@newdream.net>
Mon, 10 Mar 2008 22:43:38 +0000 (15:43 -0700)
committerSage Weil <sage@newdream.net>
Mon, 10 Mar 2008 22:43:38 +0000 (15:43 -0700)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 58aca46c41c005cb8af54b7f6f4f90f473bfab35..dd154f99c5f7f5b054db88b6769057124e355f02 100644 (file)
@@ -127,16 +127,22 @@ int Rank::Accepter::bind(int64_t force_nonce)
 
   /* socket creation */
   listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
-  assert(listen_sd >= 0);
+  if (listen_sd < 0) {
+    derr(0) << "accepter.bind unable to create socket: "
+           << strerror(errno) << dendl;
+    return -errno;
+  }
 
   int on = 1;
   ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
   
   /* bind to port */
   int rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
-  if (rc < 0) 
-    derr(0) << "accepter.bind unable to bind to " << g_my_addr.ipaddr << dendl;
-  assert(rc >= 0);
+  if (rc < 0) {
+    derr(0) << "accepter.bind unable to bind to " << g_my_addr.ipaddr
+           << ": " << strerror(errno) << dendl;
+    return -errno;
+  }
 
   // what port did we get?
   socklen_t llen = sizeof(listen_addr);
@@ -146,7 +152,11 @@ int Rank::Accepter::bind(int64_t force_nonce)
 
   // listen!
   rc = ::listen(listen_sd, 128);
-  assert(rc >= 0);
+  if (rc < 0) {
+    derr(0) << "accepter.bind unable to listen on " << g_my_addr.ipaddr 
+           << ": " << strerror(errno) << dendl;
+    return -errno;
+  }
   
   // figure out my_addr
   if (g_my_addr != entity_addr_t()) {
@@ -156,10 +166,10 @@ int Rank::Accepter::bind(int64_t force_nonce)
     // my IP is... HELP!
     struct hostent *myhostname = gethostbyname(hostname); 
     if (!myhostname) {
-      derr(0) << "unable to resolve hostname '" << hostname 
+      derr(0) << "accepter.bind unable to resolve hostname '" << hostname 
              << "', please specify your ip with --bind x.x.x.x" 
              << dendl;
-      exit(0);
+      return -1;
     }
     
     // look up my hostname.
@@ -304,7 +314,7 @@ int Rank::bind(int64_t force_nonce)
   if (started) {
     dout(10) << "rank.bind already started" << dendl;
     lock.Unlock();
-    return 0;
+    return -1;
   }
   dout(10) << "rank.bind" << dendl;
   lock.Unlock();
@@ -1379,16 +1389,28 @@ void Rank::Pipe::writer()
        Message *m = q.front();
        q.pop_front();
        m->set_seq(++out_seq);
-       sent.push_back(m); // move to sent list
        lock.Unlock();
-        dout(20) << "writer sending " << m->get_seq() << " " << m << " " << *m << dendl;
+
+        dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
+
+       // encode and copy out of *m
         if (m->empty_payload()) 
          m->encode_payload();
-       int rc = write_message(m);
+       bufferlist payload, data;
+       payload.claim(m->get_payload());
+       data.claim(m->get_data());
+       ceph_msg_header hdr = m->get_env();
+
+       lock.Lock();
+       sent.push_back(m); // move to sent list
+       lock.Unlock();
+
+        dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
+       int rc = write_message(m, &hdr, payload, data);
        lock.Lock();
        
        if (rc < 0) {
-          derr(1) << "writer error sending " << *m << " to " << m->get_dest() << ", "
+          derr(1) << "writer error sending " << m << " to " << hdr.dst << ", "
                  << errno << ": " << strerror(errno) << dendl;
          fault();
         }
@@ -1574,18 +1596,18 @@ int Rank::Pipe::write_ack(unsigned seq)
 }
 
 
-int Rank::Pipe::write_message(Message *m)
+int Rank::Pipe::write_message(Message *m, ceph_msg_header *env, 
+                             bufferlist &payload, bufferlist &data)
 {
   // get envelope, buffers
-  ceph_msg_header *env = &m->get_env();
-  env->front_len = cpu_to_le32(m->get_payload().length());
-  env->data_len = cpu_to_le32(m->get_data().length());
+  env->front_len = cpu_to_le32(payload.length());
+  env->data_len = cpu_to_le32(data.length());
 
   bufferlist blist;
-  blist.claim( m->get_payload() );
-  blist.append( m->get_data() );
+  blist.claim(payload);
+  blist.append(data);
   
-  dout(20)  << "write_message " << m << " " << *m << " to " << m->get_dest() << dendl;
+  dout(20)  << "write_message " << m << " to " << env->dst << dendl;
   
   // set up msghdr and iovecs
   struct msghdr msg;
index b0740c02dfb725966c2ea3ef7543cfd35391bcba..01fb09d577b8986ce2a17b33bbdcbd0e6d21fee8 100644 (file)
@@ -124,7 +124,8 @@ private:
     void writer();
 
     Message *read_message();
-    int write_message(Message *m);
+    int write_message(Message *m, ceph_msg_header *env, 
+                     bufferlist &payload, bufferlist &data);
     int do_sendmsg(int sd, struct msghdr *msg, int len);
     int write_ack(unsigned s);