return socket->close();
}
-seastar::future<Socket::tmp_buf>
+seastar::future<ceph::bufferptr>
FrameAssemblerV2::read_exactly(std::size_t bytes)
{
assert(has_socket());
if (unlikely(record_io)) {
return socket->read_exactly(bytes
- ).then([this](auto bl) {
- rxbuf.append(buffer::create(bl.share()));
- return bl;
+ ).then([this](auto bptr) {
+ rxbuf.append(bptr);
+ return bptr;
});
} else {
return socket->read_exactly(bytes);
}
seastar::future<>
-FrameAssemblerV2::write(ceph::bufferlist &&buf)
+FrameAssemblerV2::write(ceph::bufferlist buf)
{
assert(has_socket());
if (unlikely(record_io)) {
}
seastar::future<>
-FrameAssemblerV2::write_flush(ceph::bufferlist &&buf)
+FrameAssemblerV2::write_flush(ceph::bufferlist buf)
{
assert(has_socket());
if (unlikely(record_io)) {
{
rx_preamble.clear();
return read_exactly(rx_frame_asm.get_preamble_onwire_len()
- ).then([this](auto bl) {
+ ).then([this](auto bptr) {
try {
- rx_preamble.append(buffer::create(std::move(bl)));
+ rx_preamble.append(std::move(bptr));
const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
#ifdef UNIT_TESTS_BUILT
intercept_frame(tag, false);
uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
// TODO: create aligned and contiguous buffer from socket
return read_exactly(onwire_len
- ).then([this](auto tmp_bl) {
+ ).then([this](auto bptr) {
logger().trace("{} RECV({}) frame segment[{}]",
- conn, tmp_bl.size(), rx_segments_data.size());
+ conn, bptr.length(), rx_segments_data.size());
bufferlist segment;
- segment.append(buffer::create(std::move(tmp_bl)));
+ segment.append(std::move(bptr));
rx_segments_data.emplace_back(std::move(segment));
});
}
).then([this] {
return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
- }).then([this](auto bl) {
- logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
+ }).then([this](auto bptr) {
+ logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
bool ok = false;
try {
bufferlist rx_epilogue;
- rx_epilogue.append(buffer::create(std::move(bl)));
+ rx_epilogue.append(std::move(bptr));
ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
logger().error("read_frame_payload: {} {}", conn, e.what());
* socket read and write interfaces
*/
- seastar::future<Socket::tmp_buf> read_exactly(std::size_t bytes);
+ seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
seastar::future<ceph::bufferlist> read(std::size_t bytes);
- seastar::future<> write(ceph::bufferlist &&);
+ seastar::future<> write(ceph::bufferlist);
seastar::future<> flush();
- seastar::future<> write_flush(ceph::bufferlist &&);
+ seastar::future<> write_flush(ceph::bufferlist);
/*
* frame read and write interfaces
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
- return frame_assembler->read_exactly(banner_len); // or read exactly?
- }).then([this] (auto bl) {
+ return frame_assembler->read_exactly(banner_len);
+ }).then([this](auto bptr) {
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
logger().debug("{} RECV({}) banner: \"{}\"",
- conn, bl.size(),
- std::string((const char*)bl.get(), banner_prefix_len));
+ conn, bptr.length(),
+ std::string(bptr.c_str(), banner_prefix_len));
- if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
- if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+ if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
logger().warn("{} peer is using V1 protocol", conn);
} else {
logger().warn("{} peer sent bad banner", conn);
}
abort_in_fault();
}
- bl.trim_front(banner_prefix_len);
+
+ bptr.set_offset(bptr.offset() + banner_prefix_len);
+ bptr.set_length(bptr.length() - banner_prefix_len);
+ assert(bptr.length() == sizeof(ceph_le16));
uint16_t payload_len;
bufferlist buf;
- buf.append(buffer::create(std::move(bl)));
+ buf.append(std::move(bptr));
auto ti = buf.cbegin();
try {
decode(payload_len, ti);
trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
gated_execute("execute_server_wait", conn, [this] {
return frame_assembler->read_exactly(1
- ).then([this](auto bl) {
+ ).then([this](auto bptr) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this](std::exception_ptr eptr) {
#include <seastar/core/sleep.hh>
#include <seastar/core/when_all.hh>
+#include <seastar/net/packet.hh>
#include "crimson/common/log.h"
#include "Errors.h"
return crimson::get_logger(ceph_subsys_ms);
}
-using tmp_buf = Socket::tmp_buf;
-using packet = Socket::packet;
+using tmp_buf = seastar::temporary_buffer<char>;
+using packet = seastar::net::packet;
// an input_stream consumer that reads buffer segments into a bufferlist up to
// the given number of remaining bytes
#endif
}
-seastar::future<seastar::temporary_buffer<char>>
+seastar::future<bufferptr>
Socket::read_exactly(size_t bytes) {
#ifdef UNIT_TESTS_BUILT
return try_trap_pre(next_trap_read).then([bytes, this] {
#endif
if (bytes == 0) {
- return seastar::make_ready_future<seastar::temporary_buffer<char>>();
+ return seastar::make_ready_future<bufferptr>();
}
return in.read_exactly(bytes).then([bytes](auto buf) {
- if (buf.size() < bytes) {
+ bufferptr ptr(buffer::create(buf.share()));
+ if (ptr.length() < bytes) {
throw std::system_error(make_error_code(error::read_eof));
}
inject_failure();
return inject_delay(
- ).then([buf = std::move(buf)]() mutable {
- return seastar::make_ready_future<tmp_buf>(std::move(buf));
+ ).then([ptr = std::move(ptr)]() mutable {
+ return seastar::make_ready_future<bufferptr>(std::move(ptr));
});
});
#ifdef UNIT_TESTS_BUILT
- }).then([this](auto buf) {
+ }).then([this](auto ptr) {
return try_trap_post(next_trap_read
- ).then([buf = std::move(buf)]() mutable {
- return std::move(buf);
+ ).then([ptr = std::move(ptr)]() mutable {
+ return std::move(ptr);
});
});
#endif
}
seastar::future<>
-Socket::write(packet &&buf)
+Socket::write(bufferlist buf)
{
#ifdef UNIT_TESTS_BUILT
return try_trap_pre(next_trap_write
inject_failure();
return inject_delay(
).then([buf = std::move(buf), this]() mutable {
- return out.write(std::move(buf));
+ packet p(std::move(buf));
+ return out.write(std::move(p));
});
#ifdef UNIT_TESTS_BUILT
}).then([this] {
}
seastar::future<>
-Socket::write_flush(packet &&buf)
+Socket::write_flush(bufferlist buf)
{
#ifdef UNIT_TESTS_BUILT
return try_trap_pre(next_trap_write
inject_failure();
return inject_delay(
).then([buf = std::move(buf), this]() mutable {
- return out.write(std::move(buf)
+ packet p(std::move(buf));
+ return out.write(std::move(p)
).then([this] {
return out.flush();
});
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
-#include <seastar/net/packet.hh>
#include "include/buffer.h"
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
- using tmp_buf = seastar::temporary_buffer<char>;
- using packet = seastar::net::packet;
- seastar::future<tmp_buf> read_exactly(size_t bytes);
+ seastar::future<bufferptr> read_exactly(size_t bytes);
- seastar::future<> write(packet &&buf);
+ seastar::future<> write(bufferlist);
seastar::future<> flush();
- seastar::future<> write_flush(packet &&buf);
+ seastar::future<> write_flush(bufferlist);
// preemptively disable further reads or writes, can only be shutdown once.
void shutdown();
});
} else {
data[0] = write_count;
- return socket->write(seastar::net::packet(
- reinterpret_cast<const char*>(&data), sizeof(data))
+ bufferlist bl;
+ bl.append(buffer::copy(
+ reinterpret_cast<const char*>(&data), sizeof(data)));
+ return socket->write(bl
).then([this] {
return socket->flush();
}).then([this] {
});
} else {
return socket->read_exactly(DATA_SIZE * sizeof(uint64_t)
- ).then([this](auto buf) {
+ ).then([this](auto bptr) {
uint64_t read_data[DATA_SIZE];
- std::memcpy(read_data, buf.get(), DATA_SIZE * sizeof(uint64_t));
+ std::memcpy(read_data, bptr.c_str(), DATA_SIZE * sizeof(uint64_t));
verify_data_read(read_data);
});
}