]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Avoid encoding message with lock holding
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 26 May 2015 07:03:58 +0000 (15:03 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 30 May 2015 14:29:54 +0000 (22:29 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/test/msgr/test_msgr.cc

index 8373432bc5c40fd510fdd805cdbdc246124949df..9ef88ab8d340ac8bad1fbab283b3f3a7a9134a4a 100644 (file)
@@ -39,6 +39,9 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
                 << ").";
 }
 
+// Notes:
+// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
+
 const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
 
 class C_time_wakeup : public EventCallback {
@@ -100,12 +103,7 @@ class C_handle_dispatch : public EventCallback {
  public:
   C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
   void do_request(int id) {
-    //msgr->ms_fast_preprocess(m);
-    //if (msgr->ms_can_fast_dispatch(m)) {
-    //  msgr->ms_fast_dispatch(m);
-    //} else {
-      msgr->ms_deliver_dispatch(m);
-    //}
+    msgr->ms_deliver_dispatch(m);
   }
 };
 
@@ -176,8 +174,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
   : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1),
-    port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
+    out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
+    sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
+    open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
@@ -272,7 +271,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
 // else return < 0 means error
 int AsyncConnection::_try_send(bufferlist send_bl, bool send)
 {
-  assert(lock.is_locked());
+  assert(write_lock.is_locked());
   if (send_bl.length()) {
     if (outcoming_bl.length())
       outcoming_bl.claim_append(send_bl);
@@ -307,7 +306,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
     }
   }
 
-  uint64_t sent = 0;
+  uint64_t sent_bytes = 0;
   list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
   uint64_t left_pbrs = outcoming_bl.buffers().size();
   while (left_pbrs) {
@@ -332,7 +331,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
       return r;
 
     // "r" is the remaining length
-    sent += msglen - r;
+    sent_bytes += msglen - r;
     if (r > 0) {
       ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
                           << " needed to be sent, creating event for writing"
@@ -343,14 +342,14 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send)
   }
 
   // trim already sent for outcoming_bl
-  if (sent) {
+  if (sent_bytes) {
     bufferlist bl;
-    if (sent < outcoming_bl.length())
-      outcoming_bl.splice(sent, outcoming_bl.length()-sent, &bl);
+    if (sent_bytes < outcoming_bl.length())
+      outcoming_bl.splice(sent_bytes, outcoming_bl.length()-sent_bytes, &bl);
     bl.swap(outcoming_bl);
   }
 
-  ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent
+  ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes
                              << " remaining bytes " << outcoming_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
@@ -507,7 +506,9 @@ void AsyncConnection::process()
           ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
           t = (ceph_timespec*)state_buffer;
           utime_t kp_t = utime_t(*t);
+          write_lock.Lock();
           _send_keepalive_or_ack(true, &kp_t);
+          write_lock.Unlock();
           ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
           state = STATE_OPEN;
           break;
@@ -955,6 +956,7 @@ int AsyncConnection::_process_connection()
   switch(state) {
     case STATE_WAIT_SEND:
       {
+        Mutex::Locker l(write_lock);
         if (!outcoming_bl.length()) {
           assert(state_after_send);
           state = state_after_send;
@@ -1016,7 +1018,7 @@ int AsyncConnection::_process_connection()
 
         bufferlist bl;
         bl.append(state_buffer, strlen(CEPH_BANNER));
-        r = _try_send(bl);
+        r = try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_WAIT_IDENTIFY_PEER;
           ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: "
@@ -1091,7 +1093,7 @@ int AsyncConnection::_process_connection()
         }
 
         ::encode(async_msgr->get_myaddr(), myaddrbl);
-        r = _try_send(myaddrbl);
+        r = try_send(myaddrbl);
         if (r == 0) {
           state = STATE_CONNECTING_SEND_CONNECT_MSG;
           ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr "
@@ -1139,7 +1141,7 @@ int AsyncConnection::_process_connection()
         ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
             << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
 
-        r = _try_send(bl);
+        r = try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
           ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
@@ -1212,7 +1214,6 @@ int AsyncConnection::_process_connection()
     case STATE_CONNECTING_WAIT_ACK_SEQ:
       {
         uint64_t newly_acked_seq = 0;
-        bufferlist bl;
 
         r = read_until(sizeof(newly_acked_seq), state_buffer);
         if (r < 0) {
@@ -1224,19 +1225,21 @@ int AsyncConnection::_process_connection()
 
         newly_acked_seq = *((uint64_t*)state_buffer);
         ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
-                            << " vs out_seq " << out_seq << dendl;
-        while (newly_acked_seq > out_seq) {
-          Message *m = _get_next_outgoing();
-          assert(m);
-          ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
-                              << " " << *m << dendl;
-          assert(m->get_seq() <= newly_acked_seq);
-          m->put();
-          ++out_seq;
-        }
+                            << " vs out_seq " << out_seq.read() << dendl;
+        discard_requeued_up_to(newly_acked_seq);
+        //while (newly_acked_seq > out_seq.read()) {
+        //  Message *m = _get_next_outgoing(NULL);
+        //  assert(m);
+        //  ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
+        //                      << " " << *m << dendl;
+        //  assert(m->get_seq() <= newly_acked_seq);
+        //  m->put();
+        //  out_seq.inc();
+        //}
 
+        bufferlist bl;
         bl.append((char*)&in_seq, sizeof(in_seq));
-        r = _try_send(bl);
+        r = try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_READY;
           ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl;
@@ -1283,8 +1286,11 @@ int AsyncConnection::_process_connection()
 
         // message may in queue between last _try_send and connection ready
         // write event may already notify and we need to force scheduler again
+        write_lock.Lock();
+        can_write = 1;
         if (is_queued())
           center->dispatch_event_external(write_handler);
+        write_lock.Unlock();
 
         break;
       }
@@ -1313,7 +1319,7 @@ int AsyncConnection::_process_connection()
         ::encode(socket_addr, bl);
         ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl;
 
-        r = _try_send(bl);
+        r = try_send(bl);
         if (r == 0) {
           state = STATE_ACCEPTING_WAIT_BANNER_ADDR;
           ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: "
@@ -1442,6 +1448,9 @@ int AsyncConnection::_process_connection()
         ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
         state = STATE_OPEN;
         memset(&connect_msg, 0, sizeof(connect_msg));
+        write_lock.Lock();
+        can_write = 1;
+        write_lock.Unlock();
         break;
       }
 
@@ -1743,6 +1752,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     existing->center->dispatch_event_external(existing->reset_handler);
     existing->_stop();
   } else {
+    assert(can_write == 0);
+    existing->write_lock.Lock(true);
     // queue a reset on the new connection, which we're dumping for the old
     center->dispatch_event_external(reset_handler);
 
@@ -1767,6 +1778,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     existing->requeue_sent();
 
     swap(existing->sd, sd);
+    swap(existing->can_write, can_write);
+    existing->can_write = 0;
     existing->open_write = false;
     existing->replacing = true;
     existing->state_offset = 0;
@@ -1776,6 +1789,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     // there shouldn't exist any buffer
     assert(recv_start == recv_end);
 
+    existing->write_lock.Unlock();
     if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
       // handle error
       existing->fault();
@@ -1785,6 +1799,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     existing->lock.Unlock();
     return 0;
   }
+  existing->write_lock.Unlock();
   existing->lock.Unlock();
 
  open:
@@ -1858,7 +1873,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     goto fail_registered;
   }
 
-  r = _try_send(reply_bl);
+  r = try_send(reply_bl);
   if (r < 0)
     goto fail_registered;
 
@@ -1919,34 +1934,40 @@ void AsyncConnection::accept(int incoming)
 int AsyncConnection::send_message(Message *m)
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
+
+  if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+   ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+   Mutex::Locker l(write_lock);
+   local_messages.push_back(m);
+   center->dispatch_event_external(local_deliver_handler);
+   return 0;
+  }
+
+  bufferlist bl;
+  Mutex::Locker l(write_lock);
+  m->set_seq(out_seq.inc());
   m->get_header().src = async_msgr->get_myname();
   if (!m->get_priority())
     m->set_priority(async_msgr->get_default_send_priority());
+  if (can_write == 1)
+    prepare_send_message(m, bl);
 
-  Mutex::Locker l(lock);
-  if (!is_queued() && state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+  if (!is_queued() && can_write == 1) {
     ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
-    int r = _send(m);
+    int r = write_message(m, bl);
     if (r < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       // we want to handle fault within internal thread
       center->dispatch_event_external(write_handler);
     }
-  } else if (state == STATE_CLOSED) {
+  } else if (can_write == 2) {
     ldout(async_msgr->cct, 10) << __func__ << " connection closed."
                                << " Drop message " << m << dendl;
     m->put();
-  } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
-    ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
-    local_messages.push_back(m);
-    center->dispatch_event_external(local_deliver_handler);
   } else {
-    out_q[m->get_priority()].push_back(m);
-    if (state == STATE_STANDBY && !policy.server) {
-      ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
-                                 << " policy.server is false" << dendl;
-      _connect();
-    } else if (sd >= 0 && !open_write) {
+    out_q[m->get_priority()].push_back(make_pair(bl, m));
+    if (can_write == 0) {
+      ldout(async_msgr->cct, 10) << __func__ << " write is denied, reschedule m=" << m << dendl;
       center->dispatch_event_external(write_handler);
     }
   }
@@ -1955,35 +1976,35 @@ int AsyncConnection::send_message(Message *m)
 
 void AsyncConnection::requeue_sent()
 {
+  assert(write_lock.is_locked());
   if (sent.empty())
     return;
 
-  list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   while (!sent.empty()) {
-    Message *m = sent.back();
+    pair<bufferlist, Message*> p = sent.back();
     sent.pop_back();
-    ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq
-                         << " (" << m->get_seq() << ")" << dendl;
-    rq.push_front(m);
-    out_seq--;
+    ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
+                         << " (" << p.second->get_seq() << ")" << dendl;
+    rq.push_front(p);
   }
 }
 
 void AsyncConnection::discard_requeued_up_to(uint64_t seq)
 {
   ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
+  Mutex::Locker l(write_lock);
   if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
     return;
-  list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   while (!rq.empty()) {
-    Message *m = rq.front();
-    if (m->get_seq() == 0 || m->get_seq() > seq)
+    pair<bufferlist, Message*> p = rq.front();
+    if (p.second->get_seq() == 0 || p.second->get_seq() > seq)
       break;
-    ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq
+    ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq()
                          << " <= " << seq << ", discarding" << dendl;
-    m->put();
+    p.second->put();
     rq.pop_front();
-    out_seq++;
   }
   if (rq.empty())
     out_q.erase(CEPH_MSG_PRIO_HIGHEST);
@@ -1997,15 +2018,16 @@ void AsyncConnection::discard_out_queue()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
 
-  for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
-    ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
-    (*p)->put();
+  Mutex::Locker l(write_lock);
+  for (list<pair<bufferlist, Message*> >::iterator p = sent.begin(); p != sent.end(); ++p) {
+    ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl;
+    p->second->put();
   }
   sent.clear();
-  for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
-    for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
-      ldout(async_msgr->cct, 20) << __func__ << " discard " << *r << dendl;
-      (*r)->put();
+  for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
+    for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) {
+      ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl;
+      r->second->put();
     }
   out_q.clear();
   outcoming_bl.clear();
@@ -2016,13 +2038,15 @@ int AsyncConnection::randomize_out_seq()
   if (get_features() & CEPH_FEATURE_MSG_AUTH) {
     // Set out_seq to a random value, so CRC won't be predictable.   Don't bother checking seq_error
     // here.  We'll check it on the call.  PLR
-    int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
-    out_seq &= SEQ_MASK;
-    lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl;
+    uint64_t rand_seq;
+    int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
+    rand_seq &= SEQ_MASK;
+    lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
+    out_seq.set(rand_seq);
     return seq_error;
   } else {
     // previously, seq #'s always started at 0.
-    out_seq = 0;
+    out_seq.set(0);
     return 0;
   }
 }
@@ -2042,12 +2066,14 @@ void AsyncConnection::fault()
     return ;
   }
 
+  write_lock.Lock();
   if (sd >= 0) {
     shutdown_socket();
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
     ::close(sd);
     sd = -1;
   }
+  can_write = 0;
   open_write = false;
 
   // requeue sent items
@@ -2063,15 +2089,19 @@ void AsyncConnection::fault()
                               << " accept state just closed, state="
                               << get_state_name(state) << dendl;
     center->dispatch_event_external(reset_handler);
+
+    write_lock.Unlock();
     _stop();
     return ;
   }
   if (policy.standby && !is_queued()) {
     ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
     state = STATE_STANDBY;
+    write_lock.Unlock();
     return;
   }
 
+  write_lock.Unlock();
   if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
     // policy maybe empty when state is in accept
     if (policy.server) {
@@ -2108,7 +2138,7 @@ void AsyncConnection::was_session_reset()
   center->dispatch_event_external(remote_reset_handler);
 
   if (randomize_out_seq()) {
-    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
   }
 
   in_seq = 0;
@@ -2131,7 +2161,9 @@ void AsyncConnection::_stop()
   async_msgr->unregister_conn(this);
 
   state = STATE_CLOSED;
+  Mutex::Locker l(write_lock);
   open_write = false;
+  can_write = 2;
   state_offset = 0;
   if (sd >= 0) {
     shutdown_socket();
@@ -2145,33 +2177,35 @@ void AsyncConnection::_stop()
   center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
 }
 
-int AsyncConnection::_send(Message *m)
+void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
 {
-  m->set_seq(++out_seq);
-  if (!policy.lossy) {
-    // put on sent list
-    sent.push_back(m); 
-    m->get();
-  }
+  assert(write_lock.is_locked());
+  ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
   m->set_connection(this);
-
   uint64_t features = get_features();
   if (m->empty_payload())
-    ldout(async_msgr->cct, 20) << __func__ << " encoding " << m->get_seq() << " features " << features
-                         << " " << m << " " << *m << dendl;
+    ldout(async_msgr->cct, 20) << __func__ << " encoding features "
+                               << features << " " << m << " " << *m << dendl;
   else
-    ldout(async_msgr->cct, 20) << __func__ << " half-reencoding " << m->get_seq() << " features "
-                         << features << " " << m << " " << *m << dendl;
+    ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features "
+                               << features << " " << m << " " << *m << dendl;
 
   // encode and copy out of *m
-  m->encode(features, async_msgr->crcflags);
+  m->encode(features, msgr->crcflags);
 
   // prepare everything
   ceph_msg_header& header = m->get_header();
   ceph_msg_footer& footer = m->get_footer();
 
+  ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
+                             << " src " << entity_name_t(header.src)
+                             << " front=" << header.front_len
+                             << " data=" << header.data_len
+                             << " off " << header.data_off << dendl;
+
+
   // Now that we have all the crcs calculated, handle the
   // digital signature for the message, if the AsyncConnection has session
   // security set up.  Some session security options do not
@@ -2181,46 +2215,18 @@ int AsyncConnection::_send(Message *m)
     ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl;
   } else {
     if (session_security->sign_message(m)) {
-      ldout(async_msgr->cct, 20) << __func__ << " failed to sign seq # "
-                           << header.seq << "): sig = " << footer.sig << dendl;
+      ldout(async_msgr->cct, 20) << __func__ << " failed to sign m="
+                                 << m << "): sig = " << footer.sig << dendl;
     } else {
-      ldout(async_msgr->cct, 20) << __func__ << " signed seq # " << header.seq
-                           << "): sig = " << footer.sig << dendl;
+      ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m
+                                 << "): sig = " << footer.sig << dendl;
     }
   }
 
-  bufferlist blist = m->get_payload();
-  blist.append(m->get_middle());
-  blist.append(m->get_data());
-
-  ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
-                       << " " << m << dendl;
-  int rc = write_message(header, footer, blist);
-
-  if (rc < 0) {
-    ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
-                        << cpp_strerror(errno) << dendl;
-  } else if (rc == 0) {
-    ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
-  } else {
-    ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
-  }
-  m->put();
-
-  return rc;
-}
-
-int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer,
-                                  bufferlist& blist)
-{
-  bufferlist bl;
-  int ret;
-
   // send tag
   char tag = CEPH_MSGR_TAG_MSG;
   bl.append(&tag, sizeof(tag));
 
-  // send envelope
   ceph_msg_header_old oldheader;
   if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
     bl.append((char*)&header, sizeof(header));
@@ -2239,7 +2245,9 @@ int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg
     bl.append((char*)&oldheader, sizeof(oldheader));
   }
 
-  bl.claim_append(blist);
+  bl.claim_append(m->get_payload());
+  bl.append(m->get_middle());
+  bl.append(m->get_data());
 
   // send footer; if receiver doesn't support signatures, use the old footer format
   ceph_msg_footer_old old_footer;
@@ -2257,26 +2265,46 @@ int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg
     old_footer.flags = footer.flags;
     bl.append((char*)&old_footer, sizeof(old_footer));
   }
+}
 
-  // send
-  ret = _try_send(bl);
-  if (ret < 0)
-    return ret;
+int AsyncConnection::write_message(Message *m, bufferlist& bl)
+{
+  assert(write_lock.is_locked());
+  assert(can_write == 1);
+  if (!policy.lossy) {
+    // put on sent list
+    sent.push_back(make_pair(bl, m));
+    m->get();
+  }
 
-  return ret;
+  ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
+                             << " " << m << dendl;
+  int rc = _try_send(bl);
+  if (rc < 0) {
+    ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
+                              << cpp_strerror(errno) << dendl;
+  } else if (rc == 0) {
+    ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
+  } else {
+    ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
+  }
+  m->put();
+
+  return rc;
 }
 
 void AsyncConnection::handle_ack(uint64_t seq)
 {
   ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
   // trim sent list
-  while (!sent.empty() && sent.front()->get_seq() <= seq) {
-    Message *m = sent.front();
+  Mutex::Locker l(write_lock);
+  while (!sent.empty() && sent.front().second->get_seq() <= seq) {
+    pair<bufferlist, Message*> p = sent.front();
     sent.pop_front();
     ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
-                               << seq << " >= " << m->get_seq() << " on "
-                               << m << " " << *m << dendl;
-    m->put();
+                               << seq << " >= " << p.second->get_seq() << " on "
+                               << p.second << " " << *(p.second) << dendl;
+    p.second->put();
   }
 }
 
@@ -2299,7 +2327,7 @@ void AsyncConnection::mark_down()
 
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
-  assert(lock.is_locked());
+  assert(write_lock.is_locked());
   bufferlist bl;
 
   utime_t t = ceph_clock_now(async_msgr->cct);
@@ -2329,6 +2357,8 @@ void AsyncConnection::handle_write()
   Mutex::Locker l(lock);
   bufferlist bl;
   int r = 0;
+
+  write_lock.Lock();
   if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
     if (keepalive) {
       _send_keepalive_or_ack();
@@ -2336,12 +2366,17 @@ void AsyncConnection::handle_write()
     }
 
     while (1) {
-      Message *m = _get_next_outgoing();
+      bufferlist data;
+      Message *m = _get_next_outgoing(&data);
       if (!m)
         break;
 
+      // send_message may not encode message
+      if (!data.length())
+        prepare_send_message(m, data);
+
       ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
-      r = _send(m);
+      r = write_message(m, data);
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
         goto fail;
@@ -2366,16 +2401,22 @@ void AsyncConnection::handle_write()
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       goto fail;
     }
-  } else if (state != STATE_CONNECTING && state != STATE_CLOSED) {
+  } else if (state == STATE_STANDBY && !policy.server && is_queued()) {
+    ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+                               << " policy.server is false" << dendl;
+    _connect();
+  } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
     r = _try_send(bl);
     if (r < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
       goto fail;
     }
   }
+  write_lock.Unlock();
 
   return ;
  fail:
+  write_lock.Unlock();
   fault();
 }
 
@@ -2390,7 +2431,7 @@ void AsyncConnection::wakeup_from(uint64_t id)
 void AsyncConnection::local_deliver()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
-  Mutex::Locker l(lock);
+  Mutex::Locker l(write_lock);
   while (!local_messages.empty()) {
     Message *m = local_messages.back();
     local_messages.pop_back();
@@ -2398,12 +2439,12 @@ void AsyncConnection::local_deliver()
     m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
     ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
     async_msgr->ms_fast_preprocess(m);
-    lock.Unlock();
+    write_lock.Unlock();
     if (async_msgr->ms_can_fast_dispatch(m)) {
       async_msgr->ms_fast_dispatch(m);
     } else {
       msgr->ms_deliver_dispatch(m);
     }
-    lock.Lock();
+    write_lock.Lock();
   }
 }
index 8c677f5245012e7f523c187fbdab0b906365b27e..85a203ba357b27671eca28b3b9b4e741f6c29309 100644 (file)
@@ -45,10 +45,15 @@ class AsyncConnection : public Connection {
 
   int read_bulk(int fd, char *buf, int len);
   int do_sendmsg(struct msghdr &msg, int len, bool more);
+  int try_send(bufferlist bl, bool send=true) {
+    Mutex::Locker l(write_lock);
+    return _try_send(bl, send);
+  }
   // if "send" is false, it will only append bl to send buffer
   // the main usage is avoid error happen outside messenger threads
   int _try_send(bufferlist bl, bool send=true);
   int _send(Message *m);
+  void prepare_send_message(Message *m, bufferlist &bl);
   int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();
@@ -63,7 +68,7 @@ class AsyncConnection : public Connection {
   int randomize_out_seq();
   void handle_ack(uint64_t seq);
   void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
-  int write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist);
+  int write_message(Message *m, bufferlist& bl);
   int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
                     bufferlist authorizer_reply) {
     bufferlist reply_bl;
@@ -74,7 +79,7 @@ class AsyncConnection : public Connection {
     if (reply.authorizer_len) {
       reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
     }
-    int r = _try_send(reply_bl);
+    int r = try_send(reply_bl);
     if (r < 0)
       return -1;
 
@@ -82,25 +87,31 @@ class AsyncConnection : public Connection {
     return 0;
   }
   bool is_queued() {
+    assert(write_lock.is_locked());
     return !out_q.empty() || outcoming_bl.length();
   }
   void shutdown_socket() {
     if (sd >= 0)
       ::shutdown(sd, SHUT_RDWR);
   }
-  Message *_get_next_outgoing() {
+  Message *_get_next_outgoing(bufferlist *bl) {
+    assert(write_lock.is_locked());
     Message *m = 0;
     while (!m && !out_q.empty()) {
-      map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
-      if (!p->second.empty()) {
-        m = p->second.front();
-        p->second.pop_front();
+      map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
+      if (!it->second.empty()) {
+        list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
+        m = p->second;
+        if (bl)
+          bl->swap(p->first);
+        it->second.erase(p);
       }
-      if (p->second.empty())
-        out_q.erase(p->first);
+      if (it->second.empty())
+        out_q.erase(it->first);
     }
     return m;
   }
+
  public:
   AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
   ~AsyncConnection();
@@ -206,19 +217,24 @@ class AsyncConnection : public Connection {
   AsyncMessenger *async_msgr;
   int global_seq;
   __u32 connect_seq, peer_global_seq;
-  uint64_t out_seq;
+  atomic_t out_seq;
   uint64_t in_seq, in_seq_acked;
   int state;
   int state_after_send;
   int sd;
   int port;
   Messenger::Policy policy;
-  map<int, list<Message*> > out_q;  // priority queue for outbound msgs
-  list<Message*> sent;
+
+  Mutex write_lock;
+  int can_write;  // 0. can't send 1. can send_message 2. connection is closed
+  bool open_write;
+  map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
+  list<pair<bufferlist, Message*> > sent; // the first bufferlist need to inject seq
   list<Message*> local_messages;    // local deliver
+  bufferlist outcoming_bl;
+
   Mutex lock;
   utime_t backoff;         // backoff time
-  bool open_write;
   EventCallbackRef read_handler;
   EventCallbackRef write_handler;
   EventCallbackRef reset_handler;
@@ -265,7 +281,6 @@ class AsyncConnection : public Connection {
   char *state_buffer;
   // used only by "read_until"
   uint64_t state_offset;
-  bufferlist outcoming_bl;
   NetHandler net;
   EventCenter *center;
   ceph::shared_ptr<AuthSessionHandler> session_security;
index 1d32b8f18eaba09f6a3e1e2d0f9d2eb27fc19aa3..1a23b92edff63af186b04fd8c9c41f98435e468e 100644 (file)
@@ -745,10 +745,10 @@ class SyntheticDispatcher : public Dispatcher {
     ::decode(i, blp);
     ::decode(reply, blp);
     if (reply) {
-      cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+      //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
       reply_message(m, i);
     } else if (sent.count(i)) {
-      cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
+      //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl;
       ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
       ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
       conn_sent[m->get_connection()].pop_front();
@@ -776,6 +776,7 @@ class SyntheticDispatcher : public Dispatcher {
     if (m->get_middle().length())
       rm->set_middle(bl);
     m->get_connection()->send_message(rm);
+    //cerr << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << std::endl;
   }
 
   void send_message_wrap(ConnectionRef con, Message *m) {
@@ -791,6 +792,7 @@ class SyntheticDispatcher : public Dispatcher {
         sent[i] = m->get_data();
         conn_sent[con].push_back(i);
       }
+      //cerr << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << std::endl;
     }
     ASSERT_EQ(con->send_message(m), 0);
   }