namespace crimson::net {
FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
- : conn{_conn}
-{}
+ : conn{_conn}, sid{seastar::this_shard_id()}
+{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+}
FrameAssemblerV2::~FrameAssemblerV2()
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ assert(seastar::this_shard_id() == sid);
if (has_socket()) {
std::ignore = move_socket();
}
// should be consistent to intercept() in ProtocolV2.cc
void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (conn.interceptor) {
auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
{
+ assert(seastar::this_shard_id() == sid);
is_rev1 = _is_rev1;
tx_frame_asm.set_is_rev1(_is_rev1);
rx_frame_asm.set_is_rev1(_is_rev1);
const AuthConnectionMeta &auth_meta,
bool crossed)
{
+ assert(seastar::this_shard_id() == sid);
session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
nullptr, auth_meta, is_rev1, crossed);
}
void FrameAssemblerV2::reset_handlers()
{
+ assert(seastar::this_shard_id() == sid);
session_stream_handlers = { nullptr, nullptr };
session_comp_handlers = { nullptr, nullptr };
}
FrameAssemblerV2::mover_t
FrameAssemblerV2::to_replace()
{
+ assert(seastar::this_shard_id() == sid);
assert(is_socket_valid());
return mover_t{
move_socket(),
seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
{
+ assert(seastar::this_shard_id() == sid);
record_io = false;
rxbuf.clear();
txbuf.clear();
void FrameAssemblerV2::start_recording()
{
+ assert(seastar::this_shard_id() == sid);
record_io = true;
rxbuf.clear();
txbuf.clear();
FrameAssemblerV2::record_bufs_t
FrameAssemblerV2::stop_recording()
{
+ assert(seastar::this_shard_id() == sid);
ceph_assert_always(record_io == true);
record_io = false;
return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
bool FrameAssemblerV2::is_socket_valid() const
{
+ assert(seastar::this_shard_id() == sid);
return has_socket() && !socket->is_shutdown();
}
void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
{
+ assert(seastar::this_shard_id() == sid);
assert(!has_socket());
assert(new_socket);
socket = std::move(new_socket);
void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
socket->learn_ephemeral_port_as_connector(port);
}
void FrameAssemblerV2::shutdown_socket()
{
+ assert(seastar::this_shard_id() == sid);
assert(is_socket_valid());
socket->shutdown();
}
seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
assert(socket->is_shutdown());
auto old_socket = move_socket();
seastar::future<> FrameAssemblerV2::close_shutdown_socket()
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
assert(socket->is_shutdown());
return socket->close();
seastar::future<ceph::bufferptr>
FrameAssemblerV2::read_exactly(std::size_t bytes)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
return socket->read_exactly(bytes
seastar::future<ceph::bufferlist>
FrameAssemblerV2::read(std::size_t bytes)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
return socket->read(bytes
seastar::future<>
FrameAssemblerV2::write(ceph::bufferlist buf)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
txbuf.append(buf);
seastar::future<>
FrameAssemblerV2::flush()
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
return socket->flush();
}
seastar::future<>
FrameAssemblerV2::write_flush(ceph::bufferlist buf)
{
+ assert(seastar::this_shard_id() == sid);
assert(has_socket());
if (unlikely(record_io)) {
txbuf.append(buf);
seastar::future<FrameAssemblerV2::read_main_t>
FrameAssemblerV2::read_main_preamble()
{
+ assert(seastar::this_shard_id() == sid);
rx_preamble.clear();
return read_exactly(rx_frame_asm.get_preamble_onwire_len()
).then([this](auto bptr) {
seastar::future<FrameAssemblerV2::read_payload_t*>
FrameAssemblerV2::read_frame_payload()
{
+ assert(seastar::this_shard_id() == sid);
rx_segments_data.clear();
return seastar::do_until(
[this] {