]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: rename outcoming_bl -> outgoing_bl in AsyncConnection. 30709/head
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 3 Oct 2019 13:39:15 +0000 (15:39 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 3 Oct 2019 13:39:15 +0000 (15:39 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV2.cc

index 6e771a99e15236fca1e767f58e6b81ca474c36c7..68d6499099ef22d9ae9ef86c88d559e95e478b2c 100644 (file)
@@ -304,7 +304,7 @@ ssize_t AsyncConnection::write(bufferlist &bl,
                                bool more) {
 
     std::unique_lock<std::mutex> l(write_lock);
-    outcoming_bl.claim_append(bl);
+    outgoing_bl.claim_append(bl);
     ssize_t r = _try_send(more);
     if (r > 0) {
       writeCallback = callback;
@@ -324,16 +324,16 @@ ssize_t AsyncConnection::_try_send(bool more)
   }
 
   ceph_assert(center->in_thread());
-  ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length()
+  ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
                              << " bytes" << dendl;
-  ssize_t r = cs.send(outcoming_bl, more);
+  ssize_t r = cs.send(outgoing_bl, more);
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
     return r;
   }
 
   ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
-                             << " remaining bytes " << outcoming_bl.length() << dendl;
+                             << " remaining bytes " << outgoing_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
     center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
@@ -348,7 +348,7 @@ ssize_t AsyncConnection::_try_send(bool more)
     }
   }
 
-  return outcoming_bl.length();
+  return outgoing_bl.length();
 }
 
 void AsyncConnection::inject_delay() {
@@ -600,7 +600,7 @@ void AsyncConnection::fault()
 
   recv_start = recv_end = 0;
   state_offset = 0;
-  outcoming_bl.clear();
+  outgoing_bl.clear();
 }
 
 void AsyncConnection::_stop() {
@@ -618,7 +618,7 @@ void AsyncConnection::_stop() {
 }
 
 bool AsyncConnection::is_queued() const {
-  return outcoming_bl.length();
+  return outgoing_bl.length();
 }
 
 void AsyncConnection::shutdown_socket() {
index 3ce26e6d52a3925f5a3d8e62b5cbec89fb911160..9f6e004fe2be7b7e22a424e9667a3cf892820489 100644 (file)
@@ -170,7 +170,7 @@ class AsyncConnection : public Connection {
   DispatchQueue *dispatch_queue;
 
   // lockfree, only used in own thread
-  bufferlist outcoming_bl;
+  bufferlist outgoing_bl;
   bool open_write = false;
 
   std::mutex write_lock;
index 9f6160d7ef0844bda6a637a147b7a50e969825be..2cdbfb2e9ebac47e37baa855fe3fd08247b7c00d 100644 (file)
@@ -361,8 +361,8 @@ void ProtocolV1::write_event() {
       if (left) {
         ceph_le64 s;
         s = in_seq;
-        connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK);
-        connection->outcoming_bl.append((char *)&s, sizeof(s));
+        connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
+        connection->outgoing_bl.append((char *)&s, sizeof(s));
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
         ack_left -= left;
@@ -550,16 +550,16 @@ void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
     ceph_assert(tp);
     struct ceph_timespec ts;
     tp->encode_timeval(&ts);
-    connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
-    connection->outcoming_bl.append((char *)&ts, sizeof(ts));
+    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+    connection->outgoing_bl.append((char *)&ts, sizeof(ts));
   } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
     struct ceph_timespec ts;
     utime_t t = ceph_clock_now();
     t.encode_timeval(&ts);
-    connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
-    connection->outcoming_bl.append((char *)&ts, sizeof(ts));
+    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
+    connection->outgoing_bl.append((char *)&ts, sizeof(ts));
   } else {
-    connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
+    connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
   }
 }
 
@@ -1075,9 +1075,9 @@ void ProtocolV1::session_reset() {
 
   connection->dispatch_queue->discard_queue(connection->conn_id);
   discard_out_queue();
-  // note: we need to clear outcoming_bl here, but session_reset may be
+  // note: we need to clear outgoing_bl here, but session_reset may be
   // called by other thread, so let caller clear this itself!
-  // outcoming_bl.clear();
+  // outgoing_bl.clear();
 
   connection->dispatch_queue->queue_remote_reset(connection);
 
@@ -1133,8 +1133,8 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
     }
   }
 
-  connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG);
-  connection->outcoming_bl.append((char *)&header, sizeof(header));
+  connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
+  connection->outgoing_bl.append((char *)&header, sizeof(header));
 
   ldout(cct, 20) << __func__ << " sending message type=" << header.type
                  << " src " << entity_name_t(header.src)
@@ -1143,17 +1143,17 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
 
   if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
     for (const auto &pb : bl.buffers()) {
-      connection->outcoming_bl.append((char *)pb.c_str(), pb.length());
+      connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
     }
   } else {
-    connection->outcoming_bl.claim_append(bl);
+    connection->outgoing_bl.claim_append(bl);
   }
 
   // send footer; if receiver doesn't support signatures, use the old footer
   // format
   ceph_msg_footer_old old_footer;
   if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
-    connection->outcoming_bl.append((char *)&footer, sizeof(footer));
+    connection->outgoing_bl.append((char *)&footer, sizeof(footer));
   } else {
     if (messenger->crcflags & MSG_CRC_HEADER) {
       old_footer.front_crc = footer.front_crc;
@@ -1164,20 +1164,20 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
     old_footer.data_crc =
         messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
     old_footer.flags = footer.flags;
-    connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer));
+    connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
   }
 
   m->trace.event("async writing message");
   ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
                  << dendl;
-  ssize_t total_send_size = connection->outcoming_bl.length();
+  ssize_t total_send_size = connection->outgoing_bl.length();
   ssize_t rc = connection->_try_send(more);
   if (rc < 0) {
     ldout(cct, 1) << __func__ << " error sending " << m << ", "
                   << cpp_strerror(rc) << dendl;
   } else {
     connection->logger->inc(
-        l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
     ldout(cct, 10) << __func__ << " sending " << m
                    << (rc ? " continuely." : " done.") << dendl;
   }
@@ -1668,7 +1668,7 @@ CtPtr ProtocolV1::handle_connect_reply_2() {
     connect_seq = 0;
 
     // see session_reset
-    connection->outcoming_bl.clear();
+    connection->outgoing_bl.clear();
 
     return CONTINUE(send_connect_message);
   }
@@ -2348,7 +2348,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
             std::lock_guard<std::mutex> l(existing->lock);
             existing->write_lock.lock();
             exproto->requeue_sent();
-            existing->outcoming_bl.clear();
+            existing->outgoing_bl.clear();
             existing->open_write = false;
             existing->write_lock.unlock();
             if (exproto->state == NONE) {
index af02cc2d7291d2bfb24db7d299c86fe1c13089bc..315a7ab2abb3fac1c757b53db0b8a3eb1c1ac64f 100644 (file)
@@ -145,7 +145,7 @@ void ProtocolV2::reset_session() {
 
   connection->dispatch_queue->discard_queue(connection->conn_id);
   discard_out_queue();
-  connection->outcoming_bl.clear();
+  connection->outgoing_bl.clear();
 
   connection->dispatch_queue->queue_remote_reset(connection);
 
@@ -508,7 +508,7 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                             m->get_payload(),
                             m->get_middle(),
                             m->get_data());
-  connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
 
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
@@ -518,14 +518,14 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                  << " src=" << entity_name_t(messenger->get_myname())
                  << " off=" << header2.data_off
                  << dendl;
-  ssize_t total_send_size = connection->outcoming_bl.length();
+  ssize_t total_send_size = connection->outgoing_bl.length();
   ssize_t rc = connection->_try_send(more);
   if (rc < 0) {
     ldout(cct, 1) << __func__ << " error sending " << m << ", "
                   << cpp_strerror(rc) << dendl;
   } else {
     connection->logger->inc(
-        l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
     ldout(cct, 10) << __func__ << " sending " << m
                    << (rc ? " continuely." : " done.") << dendl;
   }
@@ -544,12 +544,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
 void ProtocolV2::append_keepalive() {
   ldout(cct, 10) << __func__ << dendl;
   auto keepalive_frame = KeepAliveFrame::Encode();
-  connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
 }
 
 void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
   auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
-  connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+  connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
 }
 
 void ProtocolV2::handle_message_ack(uint64_t seq) {
@@ -639,7 +639,7 @@ void ProtocolV2::write_event() {
       uint64_t left = ack_left;
       if (left) {
         auto ack = AckFrame::Encode(in_seq);
-        connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
+        connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
         ack_left -= left;
@@ -2715,7 +2715,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
           std::lock_guard<std::mutex> l(existing->lock);
           existing->write_lock.lock();
           exproto->requeue_sent();
-          existing->outcoming_bl.clear();
+          existing->outgoing_bl.clear();
           existing->open_write = false;
           existing->write_lock.unlock();
           if (exproto->state == NONE) {