#pragma once
+#include <seastar/core/smp.hh>
+
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request.h"
namespace crimson::osd {
+/**
+ * crosscore_ordering_t
+ *
+ * To preserve the event order from 1 source to n target cores.
+ */
+class crosscore_ordering_t {
+public:
+ using seq_t = uint64_t;
+
+ crosscore_ordering_t()
+ : out_seqs(seastar::smp::count, 0),
+ in_controls(seastar::smp::count) {}
+
+ ~crosscore_ordering_t() = default;
+
+ // Called by the original core to get the ordering sequence
+ seq_t prepare_submit(core_id_t target_core) {
+ auto &out_seq = out_seqs[target_core];
+ ++out_seq;
+ return out_seq;
+ }
+
+ /*
+ * Called by the target core to preserve the ordering
+ */
+
+ seq_t get_in_seq() const {
+ auto core = seastar::this_shard_id();
+ return in_controls[core].seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) {
+ auto core = seastar::this_shard_id();
+ auto &in_control = in_controls[core];
+ if (seq == in_control.seq + 1) {
+ ++in_control.seq;
+ if (unlikely(in_control.pr_wait.has_value())) {
+ in_control.pr_wait->set_value();
+ in_control.pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t seq) {
+ auto core = seastar::this_shard_id();
+ auto &in_control = in_controls[core];
+ assert(seq != in_control.seq + 1);
+ if (!in_control.pr_wait.has_value()) {
+ in_control.pr_wait = seastar::shared_promise<>();
+ }
+ return in_control.pr_wait->get_shared_future();
+ }
+
+private:
+ struct in_control_t {
+ seq_t seq = 0;
+ std::optional<seastar::shared_promise<>> pr_wait;
+ };
+
+ // source-side
+ std::vector<seq_t> out_seqs;
+ // target-side
+ std::vector<in_control_t> in_controls;
+};
+
struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
ConnectionPipeline client_request_conn_pipeline;
ConnectionPipeline peering_request_conn_pipeline;
ConnectionPipeline replicated_request_conn_pipeline;
+ crosscore_ordering_t crosscore_ordering;
};
static inline OSDConnectionPriv &get_osd_priv(crimson::net::Connection *conn) {
"ConnectionPipeline::await_map";
} await_map;
- struct GetPG : OrderedExclusivePhaseT<GetPG> {
+ struct GetPGMapping : OrderedExclusivePhaseT<GetPGMapping> {
static constexpr auto type_name =
- "ConnectionPipeline::get_pg";
- } get_pg;
+ "ConnectionPipeline::get_pg_mapping";
+ } get_pg_mapping;
+};
+
+struct PerShardPipeline {
+ struct CreateOrWaitPG : OrderedExclusivePhaseT<CreateOrWaitPG> {
+ static constexpr auto type_name =
+ "PerShardPipeline::create_or_wait_pg";
+ } create_or_wait_pg;
};
enum class OperationTypeCode {
: ClientRequest::StartEvent::Backend,
ConnectionPipeline::AwaitActive::BlockingEvent::Backend,
ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
- ConnectionPipeline::GetPG::BlockingEvent::Backend,
+ ConnectionPipeline::GetPGMapping::BlockingEvent::Backend,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
PGMap::PGCreationBlockingEvent::Backend,
ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
const OSD_OSDMapGate::OSDMapBlocker&) override {
}
- void handle(ConnectionPipeline::GetPG::BlockingEvent& ev,
+ void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev,
const Operation& op,
- const ConnectionPipeline::GetPG& blocker) override {
+ const ConnectionPipeline::GetPGMapping& blocker) override {
+ }
+
+ void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev,
+ const Operation& op,
+ const PerShardPipeline::CreateOrWaitPG& blocker) override {
}
void handle(PGMap::PGCreationBlockingEvent&,
: ClientRequest::StartEvent::Backend,
ConnectionPipeline::AwaitActive::BlockingEvent::Backend,
ConnectionPipeline::AwaitMap::BlockingEvent::Backend,
- ConnectionPipeline::GetPG::BlockingEvent::Backend,
+ ConnectionPipeline::GetPGMapping::BlockingEvent::Backend,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent::Backend,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent::Backend,
PGMap::PGCreationBlockingEvent::Backend,
ClientRequest::PGPipeline::AwaitMap::BlockingEvent::Backend,
const OSD_OSDMapGate::OSDMapBlocker&) override {
}
- void handle(ConnectionPipeline::GetPG::BlockingEvent& ev,
+ void handle(ConnectionPipeline::GetPGMapping::BlockingEvent& ev,
+ const Operation& op,
+ const ConnectionPipeline::GetPGMapping& blocker) override {
+ }
+
+ void handle(PerShardPipeline::CreateOrWaitPG::BlockingEvent& ev,
const Operation& op,
- const ConnectionPipeline::GetPG& blocker) override {
+ const PerShardPipeline::CreateOrWaitPG& blocker) override {
}
void handle(PGMap::PGCreationBlockingEvent&,
return get_osd_priv(conn.get()).client_request_conn_pipeline;
}
+PerShardPipeline &ClientRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_client_request_pipeline();
+}
+
ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
CompletionEvent
> tracking_events;
epoch_t get_epoch() const { return m->get_min_epoch(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
+PerShardPipeline &LogMissingRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_replicated_request_pipeline();
+}
+
ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
epoch_t get_epoch() const { return req->get_min_epoch(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
StartEvent,
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
PGMap::PGCreationBlockingEvent,
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
+PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_replicated_request_pipeline();
+}
+
ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
epoch_t get_epoch() const { return req->get_min_epoch(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
StartEvent,
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
> tracking_events;
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
+PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_peering_request_pipeline();
+}
+
void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
{
if (auto& e = get_event().get_event();
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
PGPeeringPipeline::AwaitMap::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
epoch_t get_epoch() const { return evt.get_epoch_sent(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
+PerShardPipeline &RecoverySubRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_peering_request_pipeline();
+}
+
}
epoch_t get_epoch() const { return m->get_min_epoch(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
StartEvent,
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGMap::PGCreationBlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
CompletionEvent
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
+PerShardPipeline &RepRequest::get_pershard_pipeline(
+ ShardServices &shard_services)
+{
+ return shard_services.get_replicated_request_pipeline();
+}
+
ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
epoch_t get_epoch() const { return req->get_min_epoch(); }
ConnectionPipeline &get_connection_pipeline();
+
+ PerShardPipeline &get_pershard_pipeline(ShardServices &);
+
+ crimson::net::Connection &get_connection() {
+ assert(conn);
+ return *conn;
+ };
+
seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
assert(conn);
return conn.get_foreign(
StartEvent,
ConnectionPipeline::AwaitActive::BlockingEvent,
ConnectionPipeline::AwaitMap::BlockingEvent,
- ConnectionPipeline::GetPG::BlockingEvent,
+ ConnectionPipeline::GetPGMapping::BlockingEvent,
+ PerShardPipeline::CreateOrWaitPG::BlockingEvent,
ClientRequest::PGPipeline::AwaitMap::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
PGMap::PGCreationBlockingEvent,
#include <seastar/core/shared_future.hh>
#include <seastar/core/sharded.hh>
+#include "crimson/osd/osd_connection_priv.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/pg_map.h"
});
}
+ template <typename T, typename F>
+ auto process_ordered_op_remotely(
+ crosscore_ordering_t::seq_t cc_seq,
+ ShardServices &target_shard_services,
+ typename T::IRef &&op,
+ F &&f) {
+ auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering;
+ if (crosscore_ordering.proceed_or_wait(cc_seq)) {
+ return std::invoke(
+ std::move(f),
+ target_shard_services,
+ std::move(op));
+ } else {
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ logger.debug("{} got {} at the remote pg, wait at {}",
+ *op, cc_seq, crosscore_ordering.get_in_seq());
+ return crosscore_ordering.wait(cc_seq
+ ).then([this, cc_seq, &target_shard_services,
+ op=std::move(op), f=std::move(f)]() mutable {
+ return this->template process_ordered_op_remotely<T>(
+ cc_seq, target_shard_services, std::move(op), std::move(f));
+ });
+ }
+ }
+
template <typename T, typename F>
auto with_remote_shard_state_and_op(
core_id_t core,
target_shard_services,
std::move(op));
}
+ // Note: the ordering in only preserved until f is invoked.
auto &opref = *op;
- get_local_state().registry.remove_from_registry(opref);
- return opref.prepare_remote_submission(
- ).then([op=std::move(op), f=std::move(f), this, core
- ](auto f_conn) mutable {
+ auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering;
+ auto cc_seq = crosscore_ordering.prepare_submit(core);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ logger.debug("{}: send {} to the remote pg core {}",
+ opref, cc_seq, core);
+ return opref.get_handle().complete(
+ ).then([&opref, this] {
+ get_local_state().registry.remove_from_registry(opref);
+ return opref.prepare_remote_submission();
+ }).then([op=std::move(op), f=std::move(f), this, core, cc_seq
+ ](auto f_conn) mutable {
return shard_services.invoke_on(
core,
- [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
+ [this, cc_seq,
+ f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
](auto &target_shard_services) mutable {
op->finish_remote_submission(std::move(f_conn));
target_shard_services.local_state.registry.add_to_registry(*op);
- return std::invoke(
- std::move(f),
- target_shard_services,
- std::move(op));
+ return this->template process_ordered_op_remotely<T>(
+ cc_seq, target_shard_services, std::move(op), std::move(f));
});
});
}
&get_shard_services());
});
}).then([&logger, &opref](auto epoch) {
- logger.debug("{}: got map {}, entering get_pg", opref, epoch);
+ logger.debug("{}: got map {}, entering get_pg_mapping", opref, epoch);
return opref.template enter_stage<>(
- opref.get_connection_pipeline().get_pg);
+ opref.get_connection_pipeline().get_pg_mapping);
}).then([this, &opref] {
return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
}).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable {
return this->template with_remote_shard_state_and_op<T>(
core, std::move(op),
[this](ShardServices &target_shard_services,
- typename T::IRef op) {
- if constexpr (T::can_create()) {
- return this->template run_with_pg_maybe_create<T>(
- std::move(op), target_shard_services);
- } else {
- return this->template run_with_pg_maybe_wait<T>(
- std::move(op), target_shard_services);
- }
+ typename T::IRef op) {
+ auto &opref = *op;
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ logger.debug("{}: entering create_or_wait_pg", opref);
+ return opref.template enter_stage<>(
+ opref.get_pershard_pipeline(target_shard_services).create_or_wait_pg
+ ).then([this, &target_shard_services, op=std::move(op)]() mutable {
+ if constexpr (T::can_create()) {
+ return this->template run_with_pg_maybe_create<T>(
+ std::move(op), target_shard_services);
+ } else {
+ return this->template run_with_pg_maybe_wait<T>(
+ std::move(op), target_shard_services);
+ }
+ });
});
});
return std::make_pair(id, std::move(fut));
OSDState &osd_state;
OSD_OSDMapGate osdmap_gate;
+ PerShardPipeline client_request_pipeline;
+ PerShardPipeline peering_request_pipeline;
+ PerShardPipeline replicated_request_pipeline;
+
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
return dispatch_context({}, std::move(ctx));
}
+ PerShardPipeline &get_client_request_pipeline() {
+ return local_state.client_request_pipeline;
+ }
+
+ PerShardPipeline &get_peering_request_pipeline() {
+ return local_state.peering_request_pipeline;
+ }
+
+ PerShardPipeline &get_replicated_request_pipeline() {
+ return local_state.replicated_request_pipeline;
+ }
+
/// Return per-core tid
ceph_tid_t get_tid() { return local_state.get_tid(); }