#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
#include "crimson/net/chained_dispatchers.h"
-#include "crimson/net/Socket.h"
#include "crimson/net/SocketConnection.h"
#include "msg/Message.h"
if (f_accept_new) {
(*f_accept_new)();
}
- if (socket) {
- socket->shutdown();
+ if (conn.socket) {
+ conn.socket->shutdown();
}
set_write_state(write_state_t::drop);
assert(!gate.is_closed());
// asynchronous operations
assert(!close_ready.valid());
close_ready = std::move(gate_closed).then([this] {
- if (socket) {
- return socket->close();
+ if (conn.socket) {
+ return conn.socket->close();
} else {
return seastar::now();
}
seastar::future<stop_t> Protocol::try_exit_sweep() {
assert(!is_queued());
- return socket->flush().then([this] {
+ return conn.socket->flush().then([this] {
if (!is_queued()) {
// still nothing pending to send after flush,
// the dispatching can ONLY stop now
auto acked = ack_left;
assert(acked == 0 || conn.in_seq > 0);
// sweep all pending writes with the concrete Protocol
- return socket->write(sweep_messages_and_move_to_sent(
+ return conn.socket->write(sweep_messages_and_move_to_sent(
num_msgs, need_keepalive, keepalive_ack, acked > 0)
).then([this, prv_keepalive_ack=keepalive_ack, acked] {
need_keepalive = false;
conn, write_state, e);
ceph_abort();
}
- socket->shutdown();
+ conn.socket->shutdown();
if (write_state == write_state_t::open) {
logger().info("{} write_event(): fault at {}, going to delay -- {}",
conn, write_state, e);
#include "chained_dispatchers.h"
#include "Errors.h"
-#include "Socket.h"
#include "SocketConnection.h"
#include "SocketMessenger.h"
}
#define INTERCEPT_CUSTOM(bp, type) \
-intercept({bp}, type, conn, socket)
+intercept({bp}, type, conn, conn.socket)
#define INTERCEPT_FRAME(tag, type) \
intercept({static_cast<Tag>(tag), type}, \
- type, conn, socket)
+ type, conn, conn.socket)
#define INTERCEPT_N_RW(bp) \
if (conn.interceptor) { \
const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!socket);
+ ceph_assert(!conn.socket);
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!socket);
+ ceph_assert(!conn.socket);
// until we know better
conn.target_addr = _peer_addr;
- socket = std::move(sock);
+ conn.socket = std::move(sock);
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
{
if (unlikely(record_io)) {
- return socket->read_exactly(bytes)
- .then([this] (auto bl) {
+ return conn.socket->read_exactly(bytes
+ ).then([this] (auto bl) {
rxbuf.append(buffer::create(bl.share()));
return bl;
});
} else {
- return socket->read_exactly(bytes);
+ return conn.socket->read_exactly(bytes);
};
}
seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
{
if (unlikely(record_io)) {
- return socket->read(bytes)
- .then([this] (auto buf) {
+ return conn.socket->read(bytes
+ ).then([this] (auto buf) {
rxbuf.append(buf);
return buf;
});
} else {
- return socket->read(bytes);
+ return conn.socket->read(bytes);
}
}
if (unlikely(record_io)) {
txbuf.append(buf);
}
- return socket->write(std::move(buf));
+ return conn.socket->write(std::move(buf));
}
seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
if (unlikely(record_io)) {
txbuf.append(buf);
}
- return socket->write_flush(std::move(buf));
+ return conn.socket->write_flush(std::move(buf));
}
size_t ProtocolV2::get_current_msg_size() const
void ProtocolV2::execute_connecting()
{
trigger_state(state_t::CONNECTING, write_state_t::delay, false);
- if (socket) {
- socket->shutdown();
+ if (conn.socket) {
+ conn.socket->shutdown();
}
gated_execute("execute_connecting", [this] {
global_seq = messenger.get_global_seq();
conn, get_state_name(state));
abort_protocol();
}
- if (socket) {
+ if (conn.socket) {
gate.dispatch_in_background("close_sockect_connecting", *this,
- [sock = std::move(socket)] () mutable {
+ [sock = std::move(conn.socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
abort_protocol();
});
}
- socket = std::move(sock);
+ conn.socket = std::move(sock);
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
conn, get_state_name(state));
abort_protocol();
}
- socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+ conn.socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
{
existing_proto->trigger_replacing(reconnect,
do_reset,
- std::move(socket),
+ std::move(conn.socket),
std::move(auth_meta),
std::move(session_stream_handlers),
peer_global_seq,
uint64_t new_msg_seq)
{
trigger_state(state_t::REPLACING, write_state_t::delay, false);
- if (socket) {
- socket->shutdown();
+ if (conn.socket) {
+ conn.socket->shutdown();
}
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
});
}
- if (socket) {
+ if (conn.socket) {
gate.dispatch_in_background("close_socket_replacing", *this,
- [sock = std::move(socket)] () mutable {
+ [sock = std::move(conn.socket)] () mutable {
return sock->close().then([sock = std::move(sock)] {});
});
}
- socket = std::move(new_socket);
+ conn.socket = std::move(new_socket);
auth_meta = std::move(new_auth_meta);
session_stream_handlers = std::move(new_rxtx);
record_io = false;
void ProtocolV2::execute_standby()
{
trigger_state(state_t::STANDBY, write_state_t::delay, false);
- if (socket) {
- socket->shutdown();
+ if (conn.socket) {
+ conn.socket->shutdown();
}
}
void ProtocolV2::execute_wait(bool max_backoff)
{
trigger_state(state_t::WAIT, write_state_t::delay, false);
- if (socket) {
- socket->shutdown();
+ if (conn.socket) {
+ conn.socket->shutdown();
}
gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();