*/
template <typename PtrType>
class local_shared_foreign_ptr {
- using element_type = typename std::pointer_traits<PtrType>::element_type;
- using pointer = element_type*;
-
seastar::lw_shared_ptr<seastar::foreign_ptr<PtrType>> ptr;
/// Wraps a pointer object and remembers the current core.
seastar::foreign_ptr<T> &&);
public:
+ using element_type = typename std::pointer_traits<PtrType>::element_type;
+ using pointer = element_type*;
+
/// Constructs a null local_shared_foreign_ptr<>.
local_shared_foreign_ptr() = default;
using ConnectionLRef = seastar::shared_ptr<Connection>;
using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
+using ConnectionFFRef = seastar::foreign_ptr<ConnectionRef>;
+using ConnectionXcoreRef = ::crimson::local_shared_foreign_ptr<ConnectionRef>;
class Dispatcher;
class ChainedDispatchers;
logger().debug("{}", __func__);
struct connect_ctx_t {
ObjectContext::watch_key_t key;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionXcoreRef conn;
watch_info_t info;
connect_ctx_t(
const OSDOp& osd_op,
const ExecutableMessage& msg,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
: key(osd_op.op.watch.cookie, msg.get_reqid().name),
conn(conn),
info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
return crimson::ct_error::enoent::make();
}
struct notify_ctx_t {
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionXcoreRef conn;
notify_info_t ninfo;
const uint64_t client_gid;
const epoch_t epoch;
notify_ctx_t(const ExecutableMessage& msg,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
: conn(conn),
client_gid(msg.get_reqid().name.num()),
epoch(msg.get_map_epoch()) {
ObjectContextRef _obc,
const OpInfo& op_info,
abstracted_msg_t&& msg,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
const SnapContext& _snapc)
: pg(std::move(pg)),
obc(std::move(_obc)),
class ExecutableMessagePimpl final : ExecutableMessage {
const ImplT* pimpl;
// In crimson, conn is independently maintained outside Message.
- const crimson::net::ConnectionRef conn;
+ const crimson::net::ConnectionXcoreRef conn;
public:
ExecutableMessagePimpl(const ImplT* pimpl,
- const crimson::net::ConnectionRef conn)
+ const crimson::net::ConnectionXcoreRef conn)
: pimpl(pimpl), conn(conn) {
}
ceph::static_ptr<ExecutableMessage,
sizeof(ExecutableMessagePimpl<void>)>;
abstracted_msg_t msg;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionXcoreRef conn;
std::optional<osd_op_params_t> osd_op_params;
bool user_modify = false;
ceph::os::Transaction txn;
ObjectContextRef obc,
const OpInfo& op_info,
abstracted_msg_t&& msg,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
const SnapContext& snapc);
public:
ObjectContextRef obc,
const OpInfo& op_info,
const MsgT& msg,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
const SnapContext& snapc)
: OpsExecuter(
std::move(pg),
ShardServices &_shard_services, crimson::net::ConnectionRef conn,
Ref<MOSDOp> &&m)
: shard_services(&_shard_services),
- conn(std::move(conn)),
+ l_conn(std::move(conn)),
m(std::move(m)),
instance_handle(new instance_handle_t)
{}
ConnectionPipeline &ClientRequest::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).client_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).client_request_conn_pipeline;
}
PerShardPipeline &ClientRequest::get_pershard_pipeline(
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return shard_services->send_incremental_map(
- std::ref(*conn), m->get_map_epoch()
+ std::ref(get_foreign_connection()), m->get_map_epoch()
).then([FNAME, this, this_instance_id, pgref] {
DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
pgref->client_request_orderer.remove_request(*this);
m
).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
// TODO: gate the crosscore sending
- return conn->send_with_throttling(std::move(reply));
+ return get_foreign_connection().send_with_throttling(std::move(reply));
});
}
reply->set_reply_versions(eversion_t(), 0);
reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
// TODO: gate the crosscore sending
- return conn->send_with_throttling(std::move(reply));
+ return get_foreign_connection().send_with_throttling(std::move(reply));
}
ClientRequest::interruptible_future<>
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
reply->set_reply_versions(completed->version, completed->user_version);
// TODO: gate the crosscore sending
- return conn->send_with_throttling(std::move(reply));
+ return get_foreign_connection().send_with_throttling(std::move(reply));
} else {
DEBUGDPP("{}.{}: not completed, entering get_obc stage",
*pg, *this, this_instance_id);
return reply_op_error(pg, -ENAMETOOLONG);
} else if (m->get_hobj().oid.name.empty()) {
return reply_op_error(pg, -EINVAL);
- } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) {
+ } else if (pg->get_osdmap()->is_blocklisted(
+ get_foreign_connection().get_peer_addr())) {
DEBUGDPP("{}.{}: {} is blocklisted",
- *pg, *this, this_instance_id, conn->get_peer_addr());
+ *pg, *this, this_instance_id, get_foreign_connection().get_peer_addr());
return reply_op_error(pg, -EBLOCKLISTED);
}
*pg, *this, this_instance_id, m->get_hobj());
}
}
- return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
+ return pg->do_osd_ops(
+ m, r_conn, obc, op_info, snapc
+ ).safe_then_unpack_interruptible(
[FNAME, this, pg, this_instance_id, &ihref](
auto submitted, auto all_completed) mutable {
return submitted.then_interruptible(
reply=std::move(reply)]() mutable {
DEBUGDPP("{}.{}: sending response",
*pg, *this, this_instance_id);
- return conn->send(std::move(reply));
+ // TODO: gate the crosscore sending
+ return get_foreign_connection(
+ ).send_with_throttling(std::move(reply));
});
}, crimson::ct_error::eagain::handle(
[this, pg, this_instance_id, &ihref]() mutable {
// Initially set to primary core, updated to pg core after with_pg()
ShardServices *shard_services = nullptr;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
// must be after conn due to ConnectionPipeline's life-time
Ref<MOSDOp> m;
OpInfo op_info;
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
seastar::future<> with_pg_int(Ref<PG> pg);
LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
Ref<MOSDPGUpdateLogMissing> &&req)
- : conn{std::move(conn)},
+ : l_conn{std::move(conn)},
req{std::move(req)}
{}
ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).client_request_conn_pipeline;
}
PerShardPipeline &LogMissingRequest::get_pershard_pipeline(
std::move(trigger), req->min_epoch);
});
}).then_interruptible([this, pg](auto) {
- return pg->do_update_log_missing(req, conn);
+ return pg->do_update_log_missing(req, r_conn);
}).then_interruptible([this] {
logger().debug("{}: complete", *this);
return handle.complete();
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
seastar::future<> with_pg(
private:
ClientRequest::PGPipeline &client_pp(PG &pg);
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDPGUpdateLogMissing> req;
LogMissingRequestReply::LogMissingRequestReply(
crimson::net::ConnectionRef&& conn,
Ref<MOSDPGUpdateLogMissingReply> &&req)
- : conn{std::move(conn)},
+ : l_conn{std::move(conn)},
req{std::move(req)}
{}
ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).replicated_request_conn_pipeline;
}
PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline(
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
seastar::future<> with_pg(
private:
ClientRequest::PGPipeline &client_pp(PG &pg);
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDPGUpdateLogMissingReply> req;
ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).client_request_conn_pipeline;
}
PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline(
class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
protected:
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
// must be after conn due to ConnectionPipeline's life-time
PipelineHandle handle;
template <typename... Args>
RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) :
PeeringEvent(std::forward<Args>(args)...),
- conn(conn)
+ l_conn(conn)
{}
std::tuple<
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
};
return interruptor::with_interruption([this, pgref] {
LOG_PREFIX(RecoverySubRequest::with_pg);
DEBUGI("{}: {}", "RecoverySubRequest::with_pg", *this);
- return pgref->get_recovery_backend()->handle_recovery_op(m, conn
+ return pgref->get_recovery_backend()->handle_recovery_op(m, r_conn
).then_interruptible([this] {
LOG_PREFIX(RecoverySubRequest::with_pg);
DEBUGI("{}: complete", *this);
ConnectionPipeline &RecoverySubRequest::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).client_request_conn_pipeline;
}
PerShardPipeline &RecoverySubRequest::get_pershard_pipeline(
RecoverySubRequest(
crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp>&& m)
- : conn(conn), m(m) {}
+ : l_conn(conn), m(m) {}
void print(std::ostream& out) const final
{
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
seastar::future<> with_pg(
> tracking_events;
private:
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
// must be after `conn` to ensure the ConnectionPipeline's is alive
PipelineHandle handle;
Ref<MOSDFastDispatchOp> m;
RepRequest::RepRequest(crimson::net::ConnectionRef&& conn,
Ref<MOSDRepOp> &&req)
- : conn{std::move(conn)},
+ : l_conn{std::move(conn)},
req{std::move(req)}
{}
ConnectionPipeline &RepRequest::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).client_request_conn_pipeline;
}
PerShardPipeline &RepRequest::get_pershard_pipeline(
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
}
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
}
seastar::future<> with_pg(
private:
ClientRequest::PGPipeline &client_pp(PG &pg);
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
PipelineHandle handle;
Ref<MOSDRepOp> req;
};
template <class T>
ConnectionPipeline &RemoteScrubEventBaseT<T>::get_connection_pipeline()
{
- return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+ return get_osd_priv(&get_local_connection()
+ ).peering_request_conn_pipeline;
}
template <class T>
PipelineHandle handle;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionRef l_conn;
+ crimson::net::ConnectionXcoreRef r_conn;
+
epoch_t epoch;
spg_t pgid;
public:
RemoteScrubEventBaseT(
crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
- : conn(conn), epoch(epoch), pgid(pgid) {}
+ : l_conn(std::move(conn)), epoch(epoch), pgid(pgid) {}
PGPeeringPipeline &get_peering_pipeline(PG &pg);
+
ConnectionPipeline &get_connection_pipeline();
+
PerShardPipeline &get_pershard_pipeline(ShardServices &);
- crimson::net::Connection &get_connection() {
- assert(conn);
- return *conn;
+ crimson::net::Connection &get_local_connection() {
+ assert(l_conn);
+ assert(!r_conn);
+ return *l_conn;
};
+ crimson::net::Connection &get_foreign_connection() {
+ assert(r_conn);
+ assert(!l_conn);
+ return *r_conn;
+ };
+
+ crimson::net::ConnectionFFRef prepare_remote_submission() {
+ assert(l_conn);
+ assert(!r_conn);
+ auto ret = seastar::make_foreign(std::move(l_conn));
+ l_conn.reset();
+ return ret;
+ }
+
+ void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+ assert(conn);
+ assert(!l_conn);
+ assert(!r_conn);
+ r_conn = make_local_shared_foreign(std::move(conn));
+ }
+
static constexpr bool can_create() { return false; }
spg_t get_pgid() const {
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return epoch; }
- seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
- assert(conn);
- return conn.get_foreign(
- ).then([this](auto f_conn) {
- conn.reset();
- return f_conn;
- });
- }
- void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
- assert(!conn);
- conn = make_local_shared_foreign(std::move(_conn));
- }
-
seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
ObjectContextRef obc,
const OpInfo &op_info,
const SnapContext& snapc)
PG::interruptible_future<> PG::do_update_log_missing(
Ref<MOSDPGUpdateLogMissing> m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
void handle_rep_op_reply(const MOSDRepOpReply& m);
interruptible_future<> do_update_log_missing(
Ref<MOSDPGUpdateLogMissing> m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
interruptible_future<> do_update_log_missing_reply(
Ref<MOSDPGUpdateLogMissingReply> m);
do_osd_ops_iertr::future<Ret>>;
do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
ObjectContextRef obc,
const OpInfo &op_info,
const SnapContext& snapc);
};
struct PG::do_osd_ops_params_t {
- crimson::net::ConnectionRef &get_connection() const {
+ crimson::net::ConnectionXcoreRef &get_connection() const {
return conn;
}
osd_reqid_t get_reqid() const {
return orig_source_inst.name;
}
- crimson::net::ConnectionRef &conn;
+ crimson::net::ConnectionXcoreRef &conn;
osd_reqid_t reqid;
utime_t mtime;
epoch_t map_epoch;
ShardServices &target_shard_services,
typename T::IRef &&op,
F &&f) {
- auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering;
+ auto &crosscore_ordering = get_osd_priv(
+ &op->get_foreign_connection()).crosscore_ordering;
if (crosscore_ordering.proceed_or_wait(cc_seq)) {
return std::invoke(
std::move(f),
F &&f) {
ceph_assert(op->use_count() == 1);
if (seastar::this_shard_id() == core) {
+ auto f_conn = op->prepare_remote_submission();
+ op->finish_remote_submission(std::move(f_conn));
auto &target_shard_services = shard_services.local();
return std::invoke(
std::move(f),
}
// Note: the ordering in only preserved until f is invoked.
auto &opref = *op;
- auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering;
+ auto &crosscore_ordering = get_osd_priv(
+ &opref.get_local_connection()).crosscore_ordering;
auto cc_seq = crosscore_ordering.prepare_submit(core);
auto &logger = crimson::get_logger(ceph_subsys_osd);
logger.debug("{}: send {} to the remote pg core {}",
opref, cc_seq, core);
return opref.get_handle().complete(
- ).then([&opref, this] {
- get_local_state().registry.remove_from_registry(opref);
- return opref.prepare_remote_submission();
- }).then([op=std::move(op), f=std::move(f), this, core, cc_seq
- ](auto f_conn) mutable {
+ ).then([this, core, cc_seq,
+ op=std::move(op), f=std::move(f)]() mutable {
+ get_local_state().registry.remove_from_registry(*op);
+ auto f_conn = op->prepare_remote_submission();
return shard_services.invoke_on(
core,
[this, cc_seq,
void RecoveryBackend::handle_backfill_finish(
MOSDPGBackfill& m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
ceph_assert(!pg.is_primary());
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_backfill(
MOSDPGBackfill& m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_scan_get_digest(
MOSDPGScan& m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
if (false /* FIXME: check for backfill too full */) {
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_scan(
MOSDPGScan& m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
logger().debug("{}", __func__);
if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
RecoveryBackend::interruptible_future<>
RecoveryBackend::handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
switch (m->get_header().type) {
case MSG_OSD_PG_BACKFILL:
virtual interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
virtual interruptible_future<> recover_object(
const hobject_t& soid,
private:
void handle_backfill_finish(
MOSDPGBackfill& m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
interruptible_future<> handle_backfill_progress(
MOSDPGBackfill& m);
interruptible_future<> handle_backfill_finish_ack(
MOSDPGBackfill& m);
interruptible_future<> handle_backfill(
MOSDPGBackfill& m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
interruptible_future<> handle_scan_get_digest(
MOSDPGScan& m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
interruptible_future<> handle_scan_digest(
MOSDPGScan& m);
interruptible_future<> handle_scan(
MOSDPGScan& m,
- crimson::net::ConnectionRef conn);
+ crimson::net::ConnectionXcoreRef conn);
interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
};
RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
- crimson::net::ConnectionRef conn)
+ crimson::net::ConnectionXcoreRef conn)
{
switch (m->get_header().type) {
case MSG_OSD_PG_PULL:
{}
interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
- crimson::net::ConnectionRef conn) final;
+ crimson::net::ConnectionXcoreRef conn) final;
interruptible_future<> recover_object(
const hobject_t& soid,
logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
}
-seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionXcoreRef conn, bool)
{
if (this->conn == conn) {
logger().debug("conn={} already connected", conn);
return out;
}
-Notify::Notify(crimson::net::ConnectionRef conn,
+Notify::Notify(crimson::net::ConnectionXcoreRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
struct private_ctag_t{};
std::set<NotifyRef, std::less<>> in_progress_notifies;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionXcoreRef conn;
crimson::osd::ObjectContextRef obc;
watch_info_t winfo;
}
~Watch();
- seastar::future<> connect(crimson::net::ConnectionRef, bool);
+ seastar::future<> connect(crimson::net::ConnectionXcoreRef, bool);
void disconnect();
bool is_alive() const {
return true;
class Notify : public seastar::enable_shared_from_this<Notify> {
std::set<WatchRef> watchers;
const notify_info_t ninfo;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionXcoreRef conn;
const uint64_t client_gid;
const uint64_t user_version;
bool complete{false};
/// Called on Notify timeout
void do_notify_timeout();
- Notify(crimson::net::ConnectionRef conn,
+ Notify(crimson::net::ConnectionXcoreRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify::Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionXcoreRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)