#include "msg/msg_types.h"
#include "crimson/common/errorator.h"
+#include "crimson/common/local_shared_foreign_ptr.h"
class AuthConnectionMeta;
using stop_t = seastar::stop_iteration;
class Connection;
-using ConnectionRef = seastar::shared_ptr<Connection>;
-using ConnectionFRef = seastar::foreign_ptr<ConnectionRef>;
+using ConnectionLRef = seastar::shared_ptr<Connection>;
+using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
+using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
+
+class SocketConnection;
class Dispatcher;
class ChainedDispatchers;
struct Interceptor {
socket_blocker blocker;
virtual ~Interceptor() {}
- virtual void register_conn(Connection& conn) = 0;
- virtual void register_conn_ready(Connection& conn) = 0;
- virtual void register_conn_closed(Connection& conn) = 0;
- virtual void register_conn_replaced(Connection& conn) = 0;
- virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
+ virtual void register_conn(SocketConnection& conn) = 0;
+ virtual void register_conn_ready(SocketConnection& conn) = 0;
+ virtual void register_conn_closed(SocketConnection& conn) = 0;
+ virtual void register_conn_replaced(SocketConnection& conn) = 0;
+ virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0;
};
} // namespace crimson::net
// should be consistent to intercept_frame() in FrameAssemblerV2.cc
void intercept(Breakpoint bp,
bp_type_t type,
- Connection& conn,
+ SocketConnection& conn,
Interceptor *interceptor,
SocketRef& socket) {
if (interceptor) {
#endif
using std::ostream;
-using namespace crimson::net;
using crimson::common::local_conf;
+namespace crimson::net {
+
SocketConnection::SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers)
: core(messenger.shard_id()),
return socket->get_local_address();
}
+ConnectionRef
+SocketConnection::get_local_shared_foreign_from_this()
+{
+ assert(seastar::this_shard_id() == shard_id());
+ return make_local_shared_foreign(
+ seastar::make_foreign(shared_from_this()));
+}
+
void SocketConnection::print(ostream& out) const {
out << (void*)this << " ";
messenger.print(out);
<< " >> " << get_peer_name() << " " << peer_addr;
}
}
+
+} // namespace crimson::net
class ProtocolV2;
class SocketMessenger;
-class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
#ifdef UNIT_TESTS_BUILT
return messenger;
}
+ ConnectionRef get_local_shared_foreign_from_this();
+
private:
seastar::shard_id shard_id() const;
if (auto found = lookup_conn(peer_addr); found) {
logger().debug("{} connect to existing", *found);
- return found->shared_from_this();
+ return found->get_local_shared_foreign_from_this();
}
SocketConnectionRef conn =
seastar::make_shared<SocketConnection>(*this, dispatchers);
conn->start_connect(peer_addr, peer_name);
- return conn->shared_from_this();
+ return conn->get_local_shared_foreign_from_this();
}
seastar::future<> SocketMessenger::shutdown()
IOHandler::IOHandler(ChainedDispatchers &dispatchers,
SocketConnection &conn)
: dispatchers(dispatchers),
- conn(conn)
+ conn(conn),
+ conn_ref(conn.get_local_shared_foreign_from_this())
{}
IOHandler::~IOHandler()
// protocol_is_connected can be from true to true here if the replacing is
// happening to a connected connection.
protocol_is_connected = true;
- dispatchers.ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatchers.ms_handle_accept(conn_ref);
}
void IOHandler::dispatch_connect()
}
ceph_assert_always(protocol_is_connected == false);
protocol_is_connected = true;
- dispatchers.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatchers.ms_handle_connect(conn_ref);
}
void IOHandler::dispatch_reset(bool is_replace)
return;
}
need_dispatch_reset = false;
- dispatchers.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
+ dispatchers.ms_handle_reset(conn_ref, is_replace);
}
void IOHandler::dispatch_remote_reset()
if (io_state == io_state_t::drop) {
return;
}
- dispatchers.ms_handle_remote_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ dispatchers.ms_handle_remote_reset(conn_ref);
}
void IOHandler::ack_out_sent(seq_num_t seq)
ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
ceph_le32(0), ceph_le64(0), current_header.flags};
- auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this());
Message *message = decode_message(nullptr, 0, header, footer,
- msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+ msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
if (!message) {
logger().warn("{} decode message failed", conn);
abort_in_fault();
if (is_dispatch_reset) {
dispatch_reset(is_replace);
}
+
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+
assert(!gate.is_closed());
return gate.close();
}
SocketConnection &conn;
+ // core local reference for dispatching, valid until reset/close
+ ConnectionRef conn_ref;
+
HandshakeListener *handshake_listener = nullptr;
crimson::common::Gated gate;
}
static watch_info_t create_watch_info(const OSDOp& osd_op,
- const OpsExecuter::ExecutableMessage& msg)
+ const OpsExecuter::ExecutableMessage& msg,
+ entity_addr_t peer_addr)
{
using crimson::common::local_conf;
const uint32_t timeout =
return {
osd_op.op.watch.cookie,
timeout,
- msg.get_connection()->get_peer_addr()
+ peer_addr
};
}
logger().debug("{}", __func__);
struct connect_ctx_t {
ObjectContext::watch_key_t key;
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
watch_info_t info;
connect_ctx_t(
const OSDOp& osd_op,
const ExecutableMessage& msg,
- crimson::net::ConnectionFRef conn)
+ crimson::net::ConnectionRef conn)
: key(osd_op.op.watch.cookie, msg.get_reqid().name),
- conn(std::move(conn)),
- info(create_watch_info(osd_op, msg)) {
+ conn(conn),
+ info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
}
};
- return get_message().get_connection().copy(
- ).then([&, this](auto &&conn) {
- return with_effect_on_obc(
- connect_ctx_t{ osd_op, get_message(), std::move(conn) },
- [&] (auto& ctx) {
- const auto& entity = ctx.key.second;
- auto [it, emplaced] =
- os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
- if (emplaced) {
- logger().info("registered new watch {} by {}", it->second, entity);
- txn.nop();
- } else {
- logger().info("found existing watch {} by {}", it->second, entity);
- }
- return seastar::now();
- },
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
- assert(pg);
- auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
- if (emplaced) {
- const auto& [cookie, entity] = ctx.key;
- it->second = crimson::osd::Watch::create(
- obc, ctx.info, entity, std::move(pg));
- logger().info("op_effect: added new watcher: {}", ctx.key);
- } else {
- logger().info("op_effect: found existing watcher: {}", ctx.key);
- }
- return it->second->connect(std::move(ctx.conn), true /* will_ping */);
- });
- });
+
+ return with_effect_on_obc(
+ connect_ctx_t{ osd_op, get_message(), conn },
+ [&](auto& ctx) {
+ const auto& entity = ctx.key.second;
+ auto [it, emplaced] =
+ os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+ if (emplaced) {
+ logger().info("registered new watch {} by {}", it->second, entity);
+ txn.nop();
+ } else {
+ logger().info("found existing watch {} by {}", it->second, entity);
+ }
+ return seastar::now();
+ },
+ [](auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+ assert(pg);
+ auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+ if (emplaced) {
+ const auto& [cookie, entity] = ctx.key;
+ it->second = crimson::osd::Watch::create(
+ obc, ctx.info, entity, std::move(pg));
+ logger().info("op_effect: added new watcher: {}", ctx.key);
+ } else {
+ logger().info("op_effect: found existing watcher: {}", ctx.key);
+ }
+ return it->second->connect(std::move(ctx.conn), true /* will_ping */);
+ }
+ );
}
OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
return crimson::ct_error::enoent::make();
}
struct notify_ctx_t {
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
notify_info_t ninfo;
const uint64_t client_gid;
const epoch_t epoch;
- notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn)
- : conn(std::move(conn)),
+ notify_ctx_t(const ExecutableMessage& msg,
+ crimson::net::ConnectionRef conn)
+ : conn(conn),
client_gid(msg.get_reqid().name.num()),
epoch(msg.get_map_epoch()) {
}
};
- return get_message().get_connection().copy(
- ).then([&, this](auto &&conn) {
- return with_effect_on_obc(
- notify_ctx_t{ get_message(), std::move(conn) },
- [&] (auto& ctx) {
- try {
- auto bp = osd_op.indata.cbegin();
- uint32_t ver; // obsolete
- ceph::decode(ver, bp);
- ceph::decode(ctx.ninfo.timeout, bp);
- ceph::decode(ctx.ninfo.bl, bp);
- } catch (const buffer::error&) {
- ctx.ninfo.timeout = 0;
- }
- if (!ctx.ninfo.timeout) {
- using crimson::common::local_conf;
- ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
- }
- ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
- ctx.ninfo.cookie = osd_op.op.notify.cookie;
- // return our unique notify id to the client
- ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
- return seastar::now();
- },
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
- auto alive_watchers = obc->watchers | boost::adaptors::map_values
- | boost::adaptors::filtered(
- [] (const auto& w) {
- // FIXME: filter as for the `is_ping` in `Watch::start_notify`
- return w->is_alive();
- });
- return crimson::osd::Notify::create_n_propagate(
- std::begin(alive_watchers),
- std::end(alive_watchers),
- std::move(ctx.conn),
- ctx.ninfo,
- ctx.client_gid,
- obc->obs.oi.user_version);
- });
- });
+ return with_effect_on_obc(
+ notify_ctx_t{ get_message(), conn },
+ [&](auto& ctx) {
+ try {
+ auto bp = osd_op.indata.cbegin();
+ uint32_t ver; // obsolete
+ ceph::decode(ver, bp);
+ ceph::decode(ctx.ninfo.timeout, bp);
+ ceph::decode(ctx.ninfo.bl, bp);
+ } catch (const buffer::error&) {
+ ctx.ninfo.timeout = 0;
+ }
+ if (!ctx.ninfo.timeout) {
+ using crimson::common::local_conf;
+ ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
+ }
+ ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+ ctx.ninfo.cookie = osd_op.op.notify.cookie;
+ // return our unique notify id to the client
+ ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
+ return seastar::now();
+ },
+ [](auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+ auto alive_watchers = obc->watchers | boost::adaptors::map_values
+ | boost::adaptors::filtered(
+ [] (const auto& w) {
+ // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+ return w->is_alive();
+ });
+ return crimson::osd::Notify::create_n_propagate(
+ std::begin(alive_watchers),
+ std::end(alive_watchers),
+ std::move(ctx.conn),
+ ctx.ninfo,
+ ctx.client_gid,
+ obc->obs.oi.user_version);
+ }
+ );
}
OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
ObjectContextRef _obc,
const OpInfo& op_info,
abstracted_msg_t&& msg,
+ crimson::net::ConnectionRef conn,
const SnapContext& _snapc)
: pg(std::move(pg)),
obc(std::move(_obc)),
op_info(op_info),
msg(std::move(msg)),
+ conn(conn),
snapc(_snapc)
{
if (op_info.may_write() && should_clone(*obc, snapc)) {
// with other message types than just the `MOSDOp`. The type erasure
// happens in the ctor of `OpsExecuter`.
struct ExecutableMessage {
- virtual const crimson::net::ConnectionFRef &get_connection() const = 0;
virtual osd_reqid_t get_reqid() const = 0;
virtual utime_t get_mtime() const = 0;
virtual epoch_t get_map_epoch() const = 0;
public:
ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) {
}
- const crimson::net::ConnectionFRef &get_connection() const final {
- return pimpl->get_connection();
- }
osd_reqid_t get_reqid() const final {
return pimpl->get_reqid();
}
ceph::static_ptr<ExecutableMessage,
sizeof(ExecutableMessagePimpl<void>)>;
abstracted_msg_t msg;
+ crimson::net::ConnectionRef conn;
std::optional<osd_op_params_t> osd_op_params;
bool user_modify = false;
ceph::os::Transaction txn;
ObjectContextRef obc,
const OpInfo& op_info,
abstracted_msg_t&& msg,
+ crimson::net::ConnectionRef conn,
const SnapContext& snapc);
public:
ObjectContextRef obc,
const OpInfo& op_info,
const MsgT& msg,
+ crimson::net::ConnectionRef conn,
const SnapContext& snapc)
: OpsExecuter(
std::move(pg),
abstracted_msg_t{
std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{},
&msg},
+ conn,
snapc) {
}
__func__, m->get_hobj());
}
}
- return pg->do_osd_ops(m, obc, op_info, snapc).safe_then_unpack_interruptible(
+ return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
[this, pg, &ihref](auto submitted, auto all_completed) mutable {
return submitted.then_interruptible([this, pg, &ihref] {
return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
// used by put_historic
ShardServices *put_historic_shard_services = nullptr;
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
// must be after conn due to ConnectionPipeline's life-time
Ref<MOSDOp> m;
OpInfo op_info;
spg_t get_pgid() const {
return m->get_spg();
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return instance_handle->handle; }
epoch_t get_epoch() const { return m->get_min_epoch(); }
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
seastar::future<> with_pg_int(
ShardServices &shard_services, Ref<PG> pg);
IRef ref = this;
return interruptor::with_interruption([this, pg] {
- return pg->do_update_log_missing(req);
+ return pg->do_update_log_missing(req, conn);
}, [ref](std::exception_ptr) { return seastar::now(); }, pg);
}
spg_t get_pgid() const {
return req->get_spg();
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
private:
ClientRequest::PGPipeline &pp(PG &pg);
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDPGUpdateLogMissing> req;
spg_t get_pgid() const {
return req->get_spg();
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
private:
ClientRequest::PGPipeline &pp(PG &pg);
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDPGUpdateLogMissingReply> req;
class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
protected:
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
// must be after conn due to ConnectionPipeline's life-time
PipelineHandle handle;
spg_t get_pgid() const {
return pgid;
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return evt.get_epoch_sent(); }
+
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
};
class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> {
track_event<StartEvent>();
IRef opref = this;
return interruptor::with_interruption([this, pgref] {
- return pgref->get_recovery_backend()->handle_recovery_op(m);
+ return pgref->get_recovery_backend()->handle_recovery_op(m, conn);
}, [](std::exception_ptr) {
return seastar::now();
}, pgref).finally([this, opref, pgref] {
spg_t get_pgid() const {
return m->get_spg();
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return m->get_min_epoch(); }
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
> tracking_events;
private:
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDFastDispatchOp> m;
spg_t get_pgid() const {
return req->get_spg();
}
- ConnectionPipeline &get_connection_pipeline();
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ ConnectionPipeline &get_connection_pipeline();
+ seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+ assert(conn);
+ return conn.get_foreign(
+ ).then([this](auto f_conn) {
+ conn.reset();
+ return f_conn;
+ });
+ }
+ void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+ assert(!conn);
+ conn = make_local_shared_foreign(std::move(_conn));
+ }
+
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
private:
ClientRequest::PGPipeline &pp(PG &pg);
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
PipelineHandle handle;
Ref<MOSDRepOp> req;
};
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
+ crimson::net::ConnectionRef conn,
ObjectContextRef obc,
const OpInfo &op_info,
const SnapContext& snapc)
}
return do_osd_ops_execute<MURef<MOSDOpReply>>(
seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, obc, op_info, *m, snapc),
+ Ref<PG>{this}, obc, op_info, *m, conn, snapc),
m->ops,
[this, m, obc, may_write = op_info.may_write(),
may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
[=, this, &ops, &op_info](auto &msg_params) {
return do_osd_ops_execute<void>(
seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, std::move(obc), op_info, msg_params, SnapContext{}),
+ Ref<PG>{this},
+ std::move(obc),
+ op_info,
+ msg_params,
+ msg_params.get_connection(),
+ SnapContext{}
+ ),
ops,
std::move(success_func),
std::move(failure_func));
}
PG::interruptible_future<> PG::do_update_log_missing(
- Ref<MOSDPGUpdateLogMissing> m)
+ Ref<MOSDPGUpdateLogMissing> m,
+ crimson::net::ConnectionRef conn)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
return interruptor::make_interruptible(shard_services.get_store().do_transaction(
coll_ref, std::move(t))).then_interruptible(
- [m, lcod=peering_state.get_info().last_complete, this] {
+ [m, conn, lcod=peering_state.get_info().last_complete, this] {
if (!peering_state.pg_has_reset_since(m->get_epoch())) {
peering_state.update_last_complete_ondisk(lcod);
auto reply =
m->get_tid(),
lcod);
reply->set_priority(CEPH_MSG_PRIO_HIGH);
- return m->get_connection()->send(std::move(reply));
+ return conn->send(std::move(reply));
}
return seastar::now();
});
void replica_clear_repop_obc(
const std::vector<pg_log_entry_t> &logv);
void handle_rep_op_reply(const MOSDRepOpReply& m);
- interruptible_future<> do_update_log_missing(Ref<MOSDPGUpdateLogMissing> m);
+ interruptible_future<> do_update_log_missing(
+ Ref<MOSDPGUpdateLogMissing> m,
+ crimson::net::ConnectionRef conn);
interruptible_future<> do_update_log_missing_reply(
Ref<MOSDPGUpdateLogMissingReply> m);
do_osd_ops_iertr::future<Ret>>;
do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
+ crimson::net::ConnectionRef conn,
ObjectContextRef obc,
const OpInfo &op_info,
const SnapContext& snapc);
};
struct PG::do_osd_ops_params_t {
- crimson::net::ConnectionFRef &get_connection() const {
+ crimson::net::ConnectionRef &get_connection() const {
return conn;
}
osd_reqid_t get_reqid() const {
// Only used by InternalClientRequest, no op flags
bool has_flag(uint32_t flag) const {
return false;
- }
- crimson::net::ConnectionFRef &conn;
+ }
+
+ crimson::net::ConnectionRef &conn;
osd_reqid_t reqid;
utime_t mtime;
epoch_t map_epoch;
});
}
+ template <typename T, typename F>
+ auto with_remote_shard_state_and_op(
+ core_id_t core,
+ typename T::IRef &&op,
+ F &&f) {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ if (seastar::this_shard_id() == core) {
+ auto &target_shard_services = shard_services.local();
+ return std::invoke(
+ std::move(f),
+ target_shard_services.local_state,
+ target_shard_services,
+ std::move(op));
+ }
+ return op->prepare_remote_submission(
+ ).then([op=std::move(op), f=std::move(f), this, core
+ ](auto f_conn) mutable {
+ return shard_services.invoke_on(
+ core,
+ [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
+ ](auto &target_shard_services) mutable {
+ op->finish_remote_submission(std::move(f_conn));
+ return std::invoke(
+ std::move(f),
+ target_shard_services.local_state,
+ target_shard_services,
+ std::move(op));
+ });
+ });
+ }
+
/// Runs opref on the appropriate core, creating the pg as necessary.
template <typename T>
seastar::future<> run_with_pg_maybe_create(
op->get_pgid());
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state(
- core,
- [op=std::move(op)](
- PerShardState &per_shard_state,
- ShardServices &shard_services) mutable {
+ return with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
op->get_pgid());
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state(
- core,
- [op=std::move(op)](
- PerShardState &per_shard_state,
- ShardServices &shard_services) mutable {
+ return with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
}
void RecoveryBackend::handle_backfill_finish(
- MOSDPGBackfill& m)
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn)
{
logger().debug("{}", __func__);
ceph_assert(!pg.is_primary());
m.query_epoch,
spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
reply->set_priority(pg.get_recovery_op_priority());
- std::ignore = m.get_connection()->send(std::move(reply));
+ std::ignore = conn->send(std::move(reply));
shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(&pg),
pg.get_pg_whoami(),
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_backfill(
- MOSDPGBackfill& m)
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn)
{
logger().debug("{}", __func__);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
}
switch (m.op) {
case MOSDPGBackfill::OP_BACKFILL_FINISH:
- handle_backfill_finish(m);
+ handle_backfill_finish(m, conn);
[[fallthrough]];
case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
return handle_backfill_progress(m);
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_scan_get_digest(
- MOSDPGScan& m)
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn)
{
logger().debug("{}", __func__);
if (false /* FIXME: check for backfill too full */) {
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
).then_interruptible(
- [this, query_epoch=m.query_epoch, &conn=*(m.get_connection())
+ [this, query_epoch=m.query_epoch, conn
](auto backfill_interval) {
auto reply = crimson::make_message<MOSDPGScan>(
MOSDPGScan::OP_SCAN_DIGEST,
backfill_interval.begin,
backfill_interval.end);
encode(backfill_interval.objects, reply->get_data());
- return conn.send(std::move(reply));
+ return conn->send(std::move(reply));
});
}
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_scan(
- MOSDPGScan& m)
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn)
{
logger().debug("{}", __func__);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
}
switch (m.op) {
case MOSDPGScan::OP_SCAN_GET_DIGEST:
- return handle_scan_get_digest(m);
+ return handle_scan_get_digest(m, conn);
case MOSDPGScan::OP_SCAN_DIGEST:
return handle_scan_digest(m);
default:
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_recovery_op(
- Ref<MOSDFastDispatchOp> m)
+ Ref<MOSDFastDispatchOp> m,
+ crimson::net::ConnectionRef conn)
{
switch (m->get_header().type) {
case MSG_OSD_PG_BACKFILL:
- return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m));
+ return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m), conn);
case MSG_OSD_PG_BACKFILL_REMOVE:
return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m));
case MSG_OSD_PG_SCAN:
- return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m));
+ return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m), conn);
default:
return seastar::make_exception_future<>(
std::invalid_argument(fmt::format("invalid request type: {}",
}
virtual interruptible_future<> handle_recovery_op(
- Ref<MOSDFastDispatchOp> m);
+ Ref<MOSDFastDispatchOp> m,
+ crimson::net::ConnectionRef conn);
virtual interruptible_future<> recover_object(
const hobject_t& soid,
virtual seastar::future<> on_stop() = 0;
private:
void handle_backfill_finish(
- MOSDPGBackfill& m);
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn);
interruptible_future<> handle_backfill_progress(
MOSDPGBackfill& m);
interruptible_future<> handle_backfill_finish_ack(
MOSDPGBackfill& m);
- interruptible_future<> handle_backfill(MOSDPGBackfill& m);
+ interruptible_future<> handle_backfill(
+ MOSDPGBackfill& m,
+ crimson::net::ConnectionRef conn);
interruptible_future<> handle_scan_get_digest(
- MOSDPGScan& m);
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn);
interruptible_future<> handle_scan_digest(
MOSDPGScan& m);
interruptible_future<> handle_scan(
- MOSDPGScan& m);
+ MOSDPGScan& m,
+ crimson::net::ConnectionRef conn);
interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
};
}
RecoveryBackend::interruptible_future<>
-ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+ReplicatedRecoveryBackend::handle_recovery_op(
+ Ref<MOSDFastDispatchOp> m,
+ crimson::net::ConnectionRef conn)
{
switch (m->get_header().type) {
case MSG_OSD_PG_PULL:
boost::static_pointer_cast<MOSDPGRecoveryDeleteReply>(m));
default:
// delegate to parent class for handling backend-agnostic recovery ops.
- return RecoveryBackend::handle_recovery_op(std::move(m));
+ return RecoveryBackend::handle_recovery_op(std::move(m), conn);
}
}
: RecoveryBackend(pg, shard_services, coll, backend)
{}
interruptible_future<> handle_recovery_op(
- Ref<MOSDFastDispatchOp> m) final;
+ Ref<MOSDFastDispatchOp> m,
+ crimson::net::ConnectionRef conn) final;
interruptible_future<> recover_object(
const hobject_t& soid,
logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
}
-seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
{
if (this->conn == conn) {
logger().debug("conn={} already connected", conn);
return out;
}
-Notify::Notify(crimson::net::ConnectionFRef conn,
+Notify::Notify(crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
struct private_ctag_t{};
std::set<NotifyRef, std::less<>> in_progress_notifies;
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
crimson::osd::ObjectContextRef obc;
watch_info_t winfo;
}
~Watch();
- seastar::future<> connect(crimson::net::ConnectionFRef, bool);
+ seastar::future<> connect(crimson::net::ConnectionRef, bool);
void disconnect();
bool is_alive() const {
return true;
class Notify : public seastar::enable_shared_from_this<Notify> {
std::set<WatchRef> watchers;
const notify_info_t ninfo;
- crimson::net::ConnectionFRef conn;
+ crimson::net::ConnectionRef conn;
const uint64_t client_gid;
const uint64_t user_version;
bool complete{false};
/// Called on Notify timeout
void do_notify_timeout();
- Notify(crimson::net::ConnectionFRef conn,
+ Notify(crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionFRef conn,
+ crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify::Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionFRef conn,
+ crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
ceph::bufferlist& data,
Message::ConnectionRef conn)
{
+#ifdef WITH_SEASTAR
+ // In crimson, conn is independently maintained outside Message.
+ ceph_assert(conn == nullptr);
+#endif
// verify crc
if (crcflags & MSG_CRC_HEADER) {
__u32 front_crc = front.crc32c(0);
public:
#ifdef WITH_SEASTAR
using ConnectionRef = crimson::net::ConnectionRef;
- using ConnectionFRef = crimson::net::ConnectionFRef;
#else
using ConnectionRef = ::ConnectionRef;
- using ConnectionFRef = ::ConnectionRef;
#endif // WITH_SEASTAR
protected:
/* time at which message was fully read */
utime_t recv_complete_stamp;
- ConnectionFRef connection;
+ ConnectionRef connection;
uint32_t magic = 0;
completion_hook->complete(0);
}
public:
- const ConnectionFRef& get_connection() const { return connection; }
+ const ConnectionRef& get_connection() const {
+#ifdef WITH_SEASTAR
+ // In crimson, conn is independently maintained outside Message.
+ ceph_abort();
+#endif
+ return connection;
+ }
void set_connection(ConnectionRef c) {
+#ifdef WITH_SEASTAR
+ // In crimson, conn is independently maintained outside Message.
+ ceph_assert(c == nullptr);
+#endif
connection = std::move(c);
}
CompletionHook* get_completion_hook() { return completion_hook; }
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Interceptor.h"
+#include "crimson/net/SocketConnection.h"
#include <map>
#include <random>
using crimson::net::Interceptor;
using crimson::net::Messenger;
using crimson::net::MessengerRef;
+using crimson::net::SocketConnection;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
using namespace ceph::net::test;
unsigned cnt_reset_dispatched = 0;
unsigned cnt_remote_reset_dispatched = 0;
- ConnResult(Connection& conn, unsigned index)
- : conn(conn.shared_from_this()), index(index) {}
+ ConnResult(ConnectionRef conn, unsigned index)
+ : conn(conn), index(index) {}
template <typename T>
void _assert_eq(const char* expr_actual, T actual,
}
private:
- void register_conn(Connection& conn) override {
+ void register_conn(SocketConnection& _conn) override {
+ auto conn = _conn.get_local_shared_foreign_from_this();
+ auto result = find_result(conn);
+ if (result != nullptr) {
+ logger().error("The connection [{}] {} already exists when register {}",
+ result->index, *result->conn, _conn);
+ ceph_abort();
+ }
unsigned index = results.size();
results.emplace_back(conn, index);
- conns[conn.shared_from_this()] = index;
+ conns[conn] = index;
notify();
- logger().info("[{}] {} new connection registered", index, conn);
+ logger().info("[{}] {} new connection registered", index, _conn);
}
- void register_conn_closed(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_closed(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked closed connection: {}", conn);
ceph_abort();
logger().info("[{}] {} closed({})", result->index, conn, result->state);
}
- void register_conn_ready(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_ready(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked ready connection: {}", conn);
ceph_abort();
logger().info("[{}] {} ready", result->index, conn);
}
- void register_conn_replaced(Connection& conn) override {
- auto result = find_result(conn.shared_from_this());
+ void register_conn_replaced(SocketConnection& conn) override {
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked replaced connection: {}", conn);
ceph_abort();
logger().info("[{}] {} {}", result->index, conn, result->state);
}
- bp_action_t intercept(Connection& conn, Breakpoint bp) override {
+ bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
++breakpoints_counter[bp].counter;
- auto result = find_result(conn.shared_from_this());
+ auto result = find_result(conn.get_local_shared_foreign_from_this());
if (result == nullptr) {
logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
conn, bp, breakpoints_counter[bp].counter);
auto p = m->get_data().cbegin();
decode(pl, p);
if (pl.who == Payload::PING) {
- logger().info(" {} conn= {} {}", __func__,
- *m->get_connection(), pl);
- return reply_message(m, pl);
+ logger().info(" {} conn= {} {}", __func__, *con, pl);
+ return reply_message(m, con, pl);
} else {
ceph_assert(pl.who == Payload::PONG);
if (sent.count(pl.seq)) {
- logger().info(" {} conn= {} {}", __func__,
- m->get_connection(), pl);
- ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq);
+ logger().info(" {} conn= {} {}", __func__, *con, pl);
+ ceph_assert(conn_sent[&*con].front() == pl.seq);
ceph_assert(pl.data.contents_equal(sent[pl.seq]));
- conn_sent[&*m->get_connection()].pop_front();
+ conn_sent[&*con].pop_front();
sent.erase(pl.seq);
}
clear_pending(con);
}
- std::optional<seastar::future<>> reply_message(const MessageRef m, Payload& pl) {
+ std::optional<seastar::future<>> reply_message(
+ const MessageRef m,
+ crimson::net::ConnectionRef con,
+ Payload& pl) {
pl.who = Payload::PONG;
bufferlist bl;
encode(pl, bl);
rm->set_data(bl);
if (verbose) {
logger().info("{} conn= {} reply i= {}",
- __func__, m->get_connection(), pl.seq);
+ __func__, *con, pl.seq);
}
- return m->get_connection()->send(std::move(rm));
+ return con->send(std::move(rm));
}
seastar::future<> send_message_wrap(crimson::net::ConnectionRef con,