From: Radoslaw Zarzynski Date: Thu, 3 Oct 2019 13:39:15 +0000 (+0200) Subject: msg/async: rename outcoming_bl -> outgoing_bl in AsyncConnection. X-Git-Tag: v14.2.10~200^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=edc7b50dee599457b8788c0004d893b35d16b404;p=ceph.git msg/async: rename outcoming_bl -> outgoing_bl in AsyncConnection. Signed-off-by: Radoslaw Zarzynski (cherry picked from commit 7997a3ea193344ea9782d2594cca295ac5bdf59d) --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index cab4145de715..b78d84a3d527 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -298,7 +298,7 @@ ssize_t AsyncConnection::write(bufferlist &bl, bool more) { std::unique_lock l(write_lock); - outcoming_bl.claim_append(bl); + outgoing_bl.claim_append(bl); ssize_t r = _try_send(more); if (r > 0) { writeCallback = callback; @@ -318,16 +318,16 @@ ssize_t AsyncConnection::_try_send(bool more) } ceph_assert(center->in_thread()); - ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length() + ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length() << " bytes" << dendl; - ssize_t r = cs.send(outcoming_bl, more); + ssize_t r = cs.send(outgoing_bl, more); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; return r; } ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r - << " remaining bytes " << outcoming_bl.length() << dendl; + << " remaining bytes " << outgoing_bl.length() << dendl; if (!open_write && is_queued()) { center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); @@ -342,7 +342,7 @@ ssize_t AsyncConnection::_try_send(bool more) } } - return outcoming_bl.length(); + return outgoing_bl.length(); } void AsyncConnection::inject_delay() { @@ -578,7 +578,7 @@ void AsyncConnection::fault() recv_start = recv_end = 0; state_offset = 0; - outcoming_bl.clear(); + outgoing_bl.clear(); } void AsyncConnection::_stop() { @@ -596,7 +596,7 @@ void AsyncConnection::_stop() { } bool AsyncConnection::is_queued() const { - return outcoming_bl.length(); + return outgoing_bl.length(); } void AsyncConnection::shutdown_socket() { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 5b914cc5e639..0c2512c852f1 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -170,7 +170,7 @@ class AsyncConnection : public Connection { DispatchQueue *dispatch_queue; // lockfree, only used in own thread - bufferlist outcoming_bl; + bufferlist outgoing_bl; bool open_write = false; std::mutex write_lock; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 590717160668..5840eaf49948 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -363,8 +363,8 @@ void ProtocolV1::write_event() { if (left) { ceph_le64 s; s = in_seq; - connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK); - connection->outcoming_bl.append((char *)&s, sizeof(s)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK); + connection->outgoing_bl.append((char *)&s, sizeof(s)); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -552,16 +552,16 @@ void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) { ceph_assert(tp); struct ceph_timespec ts; tp->encode_timeval(&ts); - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); - connection->outcoming_bl.append((char *)&ts, sizeof(ts)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + connection->outgoing_bl.append((char *)&ts, sizeof(ts)); } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { struct ceph_timespec ts; utime_t t = ceph_clock_now(); t.encode_timeval(&ts); - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); - connection->outcoming_bl.append((char *)&ts, sizeof(ts)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); + connection->outgoing_bl.append((char *)&ts, sizeof(ts)); } else { - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE); } } @@ -1060,9 +1060,9 @@ void ProtocolV1::session_reset() { connection->dispatch_queue->discard_queue(connection->conn_id); discard_out_queue(); - // note: we need to clear outcoming_bl here, but session_reset may be + // note: we need to clear outgoing_bl here, but session_reset may be // called by other thread, so let caller clear this itself! - // outcoming_bl.clear(); + // outgoing_bl.clear(); connection->dispatch_queue->queue_remote_reset(connection); @@ -1118,8 +1118,8 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { } } - connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG); - connection->outcoming_bl.append((char *)&header, sizeof(header)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG); + connection->outgoing_bl.append((char *)&header, sizeof(header)); ldout(cct, 20) << __func__ << " sending message type=" << header.type << " src " << entity_name_t(header.src) @@ -1128,17 +1128,17 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { for (const auto &pb : bl.buffers()) { - connection->outcoming_bl.append((char *)pb.c_str(), pb.length()); + connection->outgoing_bl.append((char *)pb.c_str(), pb.length()); } } else { - connection->outcoming_bl.claim_append(bl); + connection->outgoing_bl.claim_append(bl); } // send footer; if receiver doesn't support signatures, use the old footer // format ceph_msg_footer_old old_footer; if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) { - connection->outcoming_bl.append((char *)&footer, sizeof(footer)); + connection->outgoing_bl.append((char *)&footer, sizeof(footer)); } else { if (messenger->crcflags & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; @@ -1150,20 +1150,20 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { old_footer.data_crc = messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; old_footer.flags = footer.flags; - connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer)); + connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer)); } m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m << dendl; - ssize_t total_send_size = connection->outcoming_bl.length(); + ssize_t total_send_size = connection->outgoing_bl.length(); ssize_t rc = connection->_try_send(more); if (rc < 0) { ldout(cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(rc) << dendl; } else { connection->logger->inc( - l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length()); + l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length()); ldout(cct, 10) << __func__ << " sending " << m << (rc ? " continuely." : " done.") << dendl; } @@ -1602,7 +1602,7 @@ CtPtr ProtocolV1::handle_connect_reply_2() { connect_seq = 0; // see session_reset - connection->outcoming_bl.clear(); + connection->outgoing_bl.clear(); return CONTINUE(send_connect_message); } @@ -2290,7 +2290,7 @@ CtPtr ProtocolV1::replace(AsyncConnectionRef existing, std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); - existing->outcoming_bl.clear(); + existing->outgoing_bl.clear(); existing->open_write = false; existing->write_lock.unlock(); if (exproto->state == NONE) { diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 4b03f5ebcf46..678516f9f963 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -145,7 +145,7 @@ void ProtocolV2::reset_session() { connection->dispatch_queue->discard_queue(connection->conn_id); discard_out_queue(); - connection->outcoming_bl.clear(); + connection->outgoing_bl.clear(); connection->dispatch_queue->queue_remote_reset(connection); @@ -512,7 +512,7 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { m->get_payload(), m->get_middle(), m->get_data()); - connection->outcoming_bl.append(message.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(message.get_buffer(session_stream_handlers)); ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; @@ -522,14 +522,14 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { << " src=" << entity_name_t(messenger->get_myname()) << " off=" << header2.data_off << dendl; - ssize_t total_send_size = connection->outcoming_bl.length(); + ssize_t total_send_size = connection->outgoing_bl.length(); ssize_t rc = connection->_try_send(more); if (rc < 0) { ldout(cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(rc) << dendl; } else { connection->logger->inc( - l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length()); + l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length()); ldout(cct, 10) << __func__ << " sending " << m << (rc ? " continuely." : " done.") << dendl; } @@ -545,12 +545,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { void ProtocolV2::append_keepalive() { ldout(cct, 10) << __func__ << dendl; auto keepalive_frame = KeepAliveFrame::Encode(); - connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers)); } void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp); - connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); } void ProtocolV2::handle_message_ack(uint64_t seq) { @@ -631,7 +631,7 @@ void ProtocolV2::write_event() { ceph_le64 s; s = in_seq; auto ack = AckFrame::Encode(in_seq); - connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers)); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -2694,7 +2694,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); - existing->outcoming_bl.clear(); + existing->outgoing_bl.clear(); existing->open_write = false; existing->write_lock.unlock(); if (exproto->state == NONE) {