auto action = conn.interceptor->intercept(
conn.get_local_shared_foreign_from_this(),
Breakpoint{tag, type});
- socket->set_trap(type, action, &conn.interceptor->blocker);
+ // tolerate leaking future in tests
+ std::ignore = seastar::smp::submit_to(
+ socket->get_shard_id(),
+ [this, type, action] {
+ socket->set_trap(type, action, &conn.interceptor->blocker);
+ });
}
}
#endif
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
+ // Note: may not invoke on the socket core
socket->learn_ephemeral_port_as_connector(port);
}
});
}
+template <bool may_cross_core>
seastar::future<ceph::bufferptr>
FrameAssemblerV2::read_exactly(std::size_t bytes)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- return socket->read_exactly(bytes
- ).then([this](auto bptr) {
- rxbuf.append(bptr);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read_exactly(bytes);
+ }).then([this](auto bptr) {
+ if (record_io) {
+ rxbuf.append(bptr);
+ }
return bptr;
});
} else {
+ assert(socket->get_shard_id() == sid);
return socket->read_exactly(bytes);
- };
+ }
}
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t);
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t);
+template <bool may_cross_core>
seastar::future<ceph::bufferlist>
FrameAssemblerV2::read(std::size_t bytes)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- return socket->read(bytes
- ).then([this](auto buf) {
- rxbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, bytes] {
+ return socket->read(bytes);
+ }).then([this](auto buf) {
+ if (record_io) {
+ rxbuf.append(buf);
+ }
return buf;
});
} else {
+ assert(socket->get_shard_id() == sid);
return socket->read(bytes);
}
}
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t);
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t);
+template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::write(ceph::bufferlist buf)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (record_io) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write(std::move(buf));
}
- return socket->write(std::move(buf));
}
+template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist);
+template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::flush()
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- return socket->flush();
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this] {
+ return socket->flush();
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->flush();
+ }
}
+template seastar::future<> FrameAssemblerV2::flush<true>();
+template seastar::future<> FrameAssemblerV2::flush<false>();
+template <bool may_cross_core>
seastar::future<>
FrameAssemblerV2::write_flush(ceph::bufferlist buf)
{
assert(seastar::this_shard_id() == sid);
assert(has_socket());
- if (unlikely(record_io)) {
- txbuf.append(buf);
+ if constexpr (may_cross_core) {
+ assert(conn.get_messenger_shard_id() == sid);
+ if (unlikely(record_io)) {
+ txbuf.append(buf);
+ }
+ return seastar::smp::submit_to(
+ socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+ return socket->write_flush(std::move(buf));
+ });
+ } else {
+ assert(socket->get_shard_id() == sid);
+ return socket->write_flush(std::move(buf));
}
- return socket->write_flush(std::move(buf));
}
+template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist);
+template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_main_t>
FrameAssemblerV2::read_main_preamble()
{
assert(seastar::this_shard_id() == sid);
rx_preamble.clear();
- return read_exactly(rx_frame_asm.get_preamble_onwire_len()
+ return read_exactly<may_cross_core>(
+ rx_frame_asm.get_preamble_onwire_len()
).then([this](auto bptr) {
try {
rx_preamble.append(std::move(bptr));
}
});
}
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>();
+template <bool may_cross_core>
seastar::future<FrameAssemblerV2::read_payload_t*>
FrameAssemblerV2::read_frame_payload()
{
}
uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
- return read_exactly(onwire_len
+ return read_exactly<may_cross_core>(onwire_len
).then([this](auto bptr) {
logger().trace("{} RECV({}) frame segment[{}]",
conn, bptr.length(), rx_segments_data.size());
});
}
).then([this] {
- return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
+ return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len());
}).then([this](auto bptr) {
logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
bool ok = false;
return &rx_segments_data;
});
}
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>();
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>();
void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
{
* socket read and write interfaces
*/
+ template <bool may_cross_core = true>
seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
+ template <bool may_cross_core = true>
seastar::future<ceph::bufferlist> read(std::size_t bytes);
+ template <bool may_cross_core = true>
seastar::future<> write(ceph::bufferlist);
+ template <bool may_cross_core = true>
seastar::future<> flush();
+ template <bool may_cross_core = true>
seastar::future<> write_flush(ceph::bufferlist);
/*
ceph::msgr::v2::Tag tag;
const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
};
+ template <bool may_cross_core = true>
seastar::future<read_main_t> read_main_preamble();
/// may throw negotiation_failure as fault
using read_payload_t = ceph::msgr::v2::segment_bls_t;
// FIXME: read_payload_t cannot be no-throw move constructible
+ template <bool may_cross_core = true>
seastar::future<read_payload_t*> read_frame_payload();
template <class F>
return bl;
}
- template <class F>
+ template <class F, bool may_cross_core = true>
seastar::future<> write_flush_frame(F &tx_frame) {
assert(seastar::this_shard_id() == sid);
auto bl = get_buffer(tx_frame);
- return write_flush(std::move(bl));
+ return write_flush<may_cross_core>(std::move(bl));
}
static FrameAssemblerV2Ref create(SocketConnection &conn);
// different from the socket sid.
bool is_socket_shutdown = false;
+ // the current working shard, can be messenger or socket shard.
+ // if is messenger shard, should call interfaces with may_cross_core = true.
seastar::shard_id sid;
/*
* auth signature
+ *
+ * only in the messenger core
*/
bool record_io = false;
&session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
&session_comp_handlers};
+ // in the messenger core during handshake,
+ // and in the socket core during open,
+ // must be cleaned before switching cores.
+
ceph::bufferlist rx_preamble;
read_payload_t rx_segments_data;
seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
assert(!is_out_queued());
- return frame_assembler->flush(
+ return frame_assembler->flush<false>(
).then([this] {
if (!is_out_queued()) {
// still nothing pending to send after flush,
}
auto to_ack = ack_left;
assert(to_ack == 0 || in_seq > 0);
- return frame_assembler->write(
+ return frame_assembler->write<false>(
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
seastar::future<>
IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
{
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this, throttle_stamp, msg_size](auto payload) {
if (unlikely(io_state != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
in_exit_dispatching = seastar::promise<>();
gate.dispatch_in_background("do_in_dispatch", conn, [this] {
return seastar::keep_doing([this] {
- return frame_assembler->read_main_preamble(
+ return frame_assembler->read_main_preamble<false>(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {
});
}
case Tag::ACK:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_message_ack() logic
auto ack = AckFrame::Decode(payload->back());
ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
last_keepalive = seastar::lowres_system_clock::now();
});
case Tag::KEEPALIVE2_ACK:
- return frame_assembler->read_frame_payload(
+ return frame_assembler->read_frame_payload<false>(
).then([this](auto payload) {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());