#pragma once
+#include <concepts>
#include <limits>
+#include <optional>
+#include <type_traits>
+#include <vector>
+#include <seastar/core/shared_future.hh>
#include <seastar/core/smp.hh>
+#include "common/likely.h"
#include "crimson/common/errorator.h"
#include "crimson/common/utility.h"
});
}
-}
+enum class crosscore_type_t {
+ ONE, // from 1 to 1 core
+ ONE_N, // from 1 to n cores
+};
+
+/**
+ * smp_crosscore_ordering_t
+ *
+ * To preserve the event order from source to target core(s).
+ */
+template <crosscore_type_t CTypeValue>
+class smp_crosscore_ordering_t {
+ static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE);
+ static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N);
+ static_assert(IS_ONE || IS_ONE_N);
+
+public:
+ using seq_t = uint64_t;
+
+ smp_crosscore_ordering_t() requires IS_ONE
+ : out_seqs(0) { }
+
+ smp_crosscore_ordering_t() requires IS_ONE_N
+ : out_seqs(seastar::smp::count, 0),
+ in_controls(seastar::smp::count) {}
+
+ ~smp_crosscore_ordering_t() = default;
+
+ /*
+ * Called by the original core to get the ordering sequence
+ */
+
+ seq_t prepare_submit() requires IS_ONE {
+ return do_prepare_submit(out_seqs);
+ }
+
+ seq_t prepare_submit(core_id_t target_core) requires IS_ONE_N {
+ return do_prepare_submit(out_seqs[target_core]);
+ }
+
+ /*
+ * Called by the target core to preserve the ordering
+ */
+
+ seq_t get_in_seq() const requires IS_ONE {
+ return in_controls.seq;
+ }
+
+ seq_t get_in_seq() const requires IS_ONE_N {
+ return in_controls[seastar::this_shard_id()].seq;
+ }
+
+ bool proceed_or_wait(seq_t seq) requires IS_ONE {
+ return in_controls.proceed_or_wait(seq);
+ }
+
+ bool proceed_or_wait(seq_t seq) requires IS_ONE_N {
+ return in_controls[seastar::this_shard_id()].proceed_or_wait(seq);
+ }
+
+ seastar::future<> wait(seq_t seq) requires IS_ONE {
+ return in_controls.wait(seq);
+ }
+
+ seastar::future<> wait(seq_t seq) requires IS_ONE_N {
+ return in_controls[seastar::this_shard_id()].wait(seq);
+ }
+
+private:
+ struct in_control_t {
+ seq_t seq = 0;
+ std::optional<seastar::shared_promise<>> pr_wait;
+
+ bool proceed_or_wait(seq_t in_seq) {
+ if (in_seq == seq + 1) {
+ ++seq;
+ if (unlikely(pr_wait.has_value())) {
+ pr_wait->set_value();
+ pr_wait = std::nullopt;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ seastar::future<> wait(seq_t in_seq) {
+ assert(in_seq != seq + 1);
+ if (!pr_wait.has_value()) {
+ pr_wait = seastar::shared_promise<>();
+ }
+ return pr_wait->get_shared_future();
+ }
+ };
+
+ seq_t do_prepare_submit(seq_t &out_seq) {
+ return ++out_seq;
+ }
+
+ std::conditional_t<
+ IS_ONE,
+ seq_t, std::vector<seq_t>
+ > out_seqs;
+
+ std::conditional_t<
+ IS_ONE,
+ in_control_t, std::vector<in_control_t>
+ > in_controls;
+};
+
+} // namespace crimson
// READY state
seastar::future<> ProtocolV2::notify_out_fault(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
const char *where,
std::exception_ptr eptr,
io_handler_state _io_states)
}
seastar::future<> ProtocolV2::notify_out(
- crosscore_t::seq_t cc_seq)
+ cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
// CLOSING state
seastar::future<> ProtocolV2::notify_mark_down(
- crosscore_t::seq_t cc_seq)
+ cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
*/
private:
seastar::future<> notify_out(
- crosscore_t::seq_t cc_seq) final;
+ cc_seq_t cc_seq) final;
seastar::future<> notify_out_fault(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
const char *where,
std::exception_ptr,
io_handler_state) final;
seastar::future<> notify_mark_down(
- crosscore_t::seq_t cc_seq) final;
+ cc_seq_t cc_seq) final;
/*
* as ProtocolV2 to be called by SocketConnection
// asynchronously populated from io_handler
io_handler_state io_states;
- crosscore_t crosscore;
+ crosscore_ordering_t crosscore;
bool has_socket = false;
void IOHandler::do_set_io_state(
io_state_t new_state,
- std::optional<crosscore_t::seq_t> cc_seq,
+ std::optional<cc_seq_t> cc_seq,
FrameAssemblerV2Ref fa,
bool set_notify_out)
{
}
seastar::future<> IOHandler::set_io_state(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
io_state_t new_state,
FrameAssemblerV2Ref fa,
bool set_notify_out)
seastar::future<IOHandler::exit_dispatching_ret>
IOHandler::wait_io_exit_dispatching(
- crosscore_t::seq_t cc_seq)
+ cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
}
seastar::future<> IOHandler::reset_session(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
bool full)
{
assert(seastar::this_shard_id() == get_shard_id());
}
seastar::future<> IOHandler::reset_peer_state(
- crosscore_t::seq_t cc_seq)
+ cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
}
seastar::future<> IOHandler::requeue_out_sent(
- crosscore_t::seq_t cc_seq)
+ cc_seq_t cc_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
if (!crosscore.proceed_or_wait(cc_seq)) {
}
seastar::future<> IOHandler::requeue_out_sent_up_to(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seq_num_t msg_seq)
{
assert(seastar::this_shard_id() == get_shard_id());
seastar::future<>
IOHandler::dispatch_accept(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef conn_fref,
bool is_replace)
seastar::future<>
IOHandler::dispatch_connect(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef conn_fref)
{
seastar::future<>
IOHandler::to_new_sid(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef conn_fref,
std::optional<bool> is_replace)
}
seastar::future<> IOHandler::set_accepted_sid(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id sid,
ConnectionFRef conn_fref)
{
seastar::future<>
IOHandler::close_io(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
bool is_dispatch_reset,
bool is_replace)
{
#include <vector>
-#include <seastar/core/shared_future.hh>
#include <seastar/util/later.hh>
#include "crimson/common/gated.h"
+#include "crimson/common/smp_helpers.h"
#include "Fwd.h"
#include "SocketConnection.h"
#include "FrameAssemblerV2.h"
namespace crimson::net {
-/**
- * crosscore_t
- *
- * To preserve the event order across cores.
- */
-class crosscore_t {
-public:
- using seq_t = uint64_t;
-
- crosscore_t() = default;
- ~crosscore_t() = default;
-
- seq_t get_in_seq() const {
- return in_seq;
- }
-
- seq_t prepare_submit() {
- ++out_seq;
- return out_seq;
- }
-
- bool proceed_or_wait(seq_t seq) {
- if (seq == in_seq + 1) {
- ++in_seq;
- if (unlikely(in_pr_wait.has_value())) {
- in_pr_wait->set_value();
- in_pr_wait = std::nullopt;
- }
- return true;
- } else {
- return false;
- }
- }
-
- seastar::future<> wait(seq_t seq) {
- assert(seq != in_seq + 1);
- if (!in_pr_wait.has_value()) {
- in_pr_wait = seastar::shared_promise<>();
- }
- return in_pr_wait->get_shared_future();
- }
-
-private:
- seq_t out_seq = 0;
- seq_t in_seq = 0;
- std::optional<seastar::shared_promise<>> in_pr_wait;
-};
-
/**
* io_handler_state
*
*/
class HandshakeListener {
public:
+ using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+ using cc_seq_t = crosscore_ordering_t::seq_t;
+
virtual ~HandshakeListener() = default;
HandshakeListener(const HandshakeListener&) = delete;
HandshakeListener &operator=(HandshakeListener &&) = delete;
virtual seastar::future<> notify_out(
- crosscore_t::seq_t cc_seq) = 0;
+ cc_seq_t cc_seq) = 0;
virtual seastar::future<> notify_out_fault(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
const char *where,
std::exception_ptr,
io_handler_state) = 0;
virtual seastar::future<> notify_mark_down(
- crosscore_t::seq_t cc_seq) = 0;
+ cc_seq_t cc_seq) = 0;
protected:
HandshakeListener() = default;
*/
class IOHandler final : public ConnectionHandler {
public:
+ using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE>;
+ using cc_seq_t = crosscore_ordering_t::seq_t;
+
IOHandler(ChainedDispatchers &,
SocketConnection &);
void print_io_stat(std::ostream &out) const;
seastar::future<> set_accepted_sid(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id sid,
ConnectionFRef conn_fref);
*/
seastar::future<> close_io(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
bool is_dispatch_reset,
bool is_replace);
friend class fmt::formatter<io_state_t>;
seastar::future<> set_io_state(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
io_state_t new_state,
FrameAssemblerV2Ref fa,
bool set_notify_out);
};
seastar::future<exit_dispatching_ret>
wait_io_exit_dispatching(
- crosscore_t::seq_t cc_seq);
+ cc_seq_t cc_seq);
seastar::future<> reset_session(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
bool full);
seastar::future<> reset_peer_state(
- crosscore_t::seq_t cc_seq);
+ cc_seq_t cc_seq);
seastar::future<> requeue_out_sent_up_to(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seq_num_t msg_seq);
seastar::future<> requeue_out_sent(
- crosscore_t::seq_t cc_seq);
+ cc_seq_t cc_seq);
seastar::future<> dispatch_accept(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef,
bool is_replace);
seastar::future<> dispatch_connect(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef);
void do_set_io_state(
io_state_t new_state,
- std::optional<crosscore_t::seq_t> cc_seq = std::nullopt,
+ std::optional<cc_seq_t> cc_seq = std::nullopt,
FrameAssemblerV2Ref fa = nullptr,
bool set_notify_out = false);
seastar::future<> do_send_keepalive();
seastar::future<> to_new_sid(
- crosscore_t::seq_t cc_seq,
+ cc_seq_t cc_seq,
seastar::shard_id new_sid,
ConnectionFRef,
std::optional<bool> is_replace);
private:
shard_states_ref_t shard_states;
- crosscore_t crosscore;
+ crosscore_ordering_t crosscore;
// drop was happening in the previous sid
std::optional<seastar::shard_id> maybe_dropped_sid;
#pragma once
-#include <seastar/core/smp.hh>
-
+#include "crimson/common/smp_helpers.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request.h"
namespace crimson::osd {
-/**
- * crosscore_ordering_t
- *
- * To preserve the event order from 1 source to n target cores.
- */
-class crosscore_ordering_t {
-public:
- using seq_t = uint64_t;
-
- crosscore_ordering_t()
- : out_seqs(seastar::smp::count, 0),
- in_controls(seastar::smp::count) {}
-
- ~crosscore_ordering_t() = default;
-
- // Called by the original core to get the ordering sequence
- seq_t prepare_submit(core_id_t target_core) {
- auto &out_seq = out_seqs[target_core];
- ++out_seq;
- return out_seq;
- }
-
- /*
- * Called by the target core to preserve the ordering
- */
-
- seq_t get_in_seq() const {
- auto core = seastar::this_shard_id();
- return in_controls[core].seq;
- }
-
- bool proceed_or_wait(seq_t seq) {
- auto core = seastar::this_shard_id();
- auto &in_control = in_controls[core];
- if (seq == in_control.seq + 1) {
- ++in_control.seq;
- if (unlikely(in_control.pr_wait.has_value())) {
- in_control.pr_wait->set_value();
- in_control.pr_wait = std::nullopt;
- }
- return true;
- } else {
- return false;
- }
- }
-
- seastar::future<> wait(seq_t seq) {
- auto core = seastar::this_shard_id();
- auto &in_control = in_controls[core];
- assert(seq != in_control.seq + 1);
- if (!in_control.pr_wait.has_value()) {
- in_control.pr_wait = seastar::shared_promise<>();
- }
- return in_control.pr_wait->get_shared_future();
- }
-
-private:
- struct in_control_t {
- seq_t seq = 0;
- std::optional<seastar::shared_promise<>> pr_wait;
- };
-
- // source-side
- std::vector<seq_t> out_seqs;
- // target-side
- std::vector<in_control_t> in_controls;
-};
-
struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
+ using crosscore_ordering_t = smp_crosscore_ordering_t<crosscore_type_t::ONE_N>;
+
ConnectionPipeline client_request_conn_pipeline;
ConnectionPipeline peering_request_conn_pipeline;
ConnectionPipeline replicated_request_conn_pipeline;
template <typename T, typename F>
auto process_ordered_op_remotely(
- crosscore_ordering_t::seq_t cc_seq,
+ OSDConnectionPriv::crosscore_ordering_t::seq_t cc_seq,
ShardServices &target_shard_services,
typename T::IRef &&op,
F &&f) {