return std::size(snap_trimq);
}
+ /**
+ * complete_rctx
+ *
+ * complete_rctx is responsible for submitting writes and messages
+ * resulting from processing a PeeringState event as well as resolving
+ * any asyncronous actions initiated by the PeeringState::Listener
+ * callbacks below. The caller is responsible for calling complete_rctx
+ * and waiting for the future to resolve before exiting the
+ * PGPeeringPipeline::process stage (see osd_operations/peering_event.h).
+ *
+ * orderer below ensures that operations submitted on the OSD-wide
+ * OSDSingleton instance are completed in the order initiated. This is
+ * specifically important for operations on the local and remote async
+ * reserver instances, as well as setting and clearing pg_temp mapping
+ * requests.
+ */
+ ShardServices::singleton_orderer_t orderer;
seastar::future<> complete_rctx(PeeringCtx &&rctx) {
- return seastar::when_all_succeed(
- get_need_up_thru()
- ? shard_services.send_alive(
- get_same_interval_since())
- : seastar::now(),
+ shard_services.send_pg_temp(orderer);
+ if (get_need_up_thru()) {
+ shard_services.send_alive(orderer, get_same_interval_since());
+ }
+
+ ShardServices::singleton_orderer_t o;
+ std::swap(o, orderer);
+ return seastar::when_all(
shard_services.dispatch_context(
get_collection_ref(),
std::move(rctx)),
- shard_services.send_pg_temp()
- ).then([](auto){});
+ shard_services.run_orderer(std::move(o))
+ ).then([](auto) {});
}
void send_cluster_message(
SUBDEBUGDPP(
osd, "message {} to {} share_map_update {}",
*this, *m, osd, share_map_update);
- (void)shard_services.send_to_osd(osd, std::move(m), epoch);
+ /* We don't bother to queue this one in the orderer because capturing the
+ * message ref in std::function is problematic as it isn't copyable. This
+ * is solvable, but it's not quite worth the effort at the moment as we
+ * aren't worried about ordering of message send events except between
+ * messages to the same target within an interval, which doesn't really
+ * happen while processing a single event. It'll probably be worth
+ * generalizing the orderer structure to fix this in the future, probably
+ * by using std::move_only_function once widely available. */
+ std::ignore = shard_services.send_to_osd(osd, std::move(m), epoch);
}
void send_pg_created(pg_t pgid) final {
LOG_PREFIX(PG::send_pg_created);
SUBDEBUGDPP(osd, "pgid {}", *this, pgid);
- (void)shard_services.send_pg_created(pgid);
+ shard_services.send_pg_created(orderer, pgid);
}
bool try_flush_or_schedule_async() final;
SUBDEBUGDPP(
osd, "priority {} on_grant {} on_preempt {}",
*this, on_grant->get_desc(), on_preempt->get_desc());
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.local_request_reservation(
+ shard_services.local_request_reservation(
+ orderer,
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
- start_peering_event_operation(std::move(*on_preempt));
- }) : nullptr);
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr
+ );
}
void update_local_background_io_priority(
unsigned priority) final {
LOG_PREFIX(PG::update_local_background_io_priority);
SUBDEBUGDPP(osd, "priority {}", *this, priority);
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.local_update_priority(
+ shard_services.local_update_priority(
+ orderer,
pgid,
priority);
}
void cancel_local_background_io_reservation() final {
LOG_PREFIX(PG::cancel_local_background_io_reservation);
SUBDEBUGDPP(osd, "", *this);
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.local_cancel_reservation(
+ shard_services.local_cancel_reservation(
+ orderer,
pgid);
}
SUBDEBUGDPP(
osd, "priority {} on_grant {} on_preempt {}",
*this, on_grant->get_desc(), on_preempt->get_desc());
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.remote_request_reservation(
+ shard_services.remote_request_reservation(
+ orderer,
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
priority,
on_preempt ? make_lambda_context(
[this, on_preempt=std::move(on_preempt)] (int) {
- start_peering_event_operation(std::move(*on_preempt));
- }) : nullptr);
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr
+ );
}
void cancel_remote_recovery_reservation() final {
LOG_PREFIX(PG::cancel_remote_recovery_reservation);
SUBDEBUGDPP(osd, "", *this);
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.remote_cancel_reservation(
- pgid);
+ shard_services.remote_cancel_reservation(orderer, pgid);
}
void schedule_event_on_commit(
void queue_want_pg_temp(const std::vector<int> &wanted) final {
LOG_PREFIX(PG::queue_want_pg_temp);
SUBDEBUGDPP(osd, "wanted {}", *this, wanted);
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+ shard_services.queue_want_pg_temp(orderer, pgid.pgid, wanted);
}
void clear_want_pg_temp() final {
LOG_PREFIX(PG::clear_want_pg_temp);
SUBDEBUGDPP(osd, "", *this);
- // TODO -- we probably want to add a mechanism for blocking on this
- // after handling the peering event
- std::ignore = shard_services.remove_want_pg_temp(pgid.pgid);
+ shard_services.remove_want_pg_temp(orderer, pgid.pgid);
}
void check_recovery_sources(const OSDMapRef& newmap) final {
// Not needed yet
);
}
+public:
+ /**
+ * singleton_orderer_t
+ *
+ * schedule_for_singleton/run_orderer allows users to queue a sequence
+ * of operations on the OSDSingleton instance and run them as an ordered
+ * batch. The user may rely on operations being completed in the order
+ * submitted.
+ *
+ * Generally, users will declare a singleton_orderer_t instance, pass it
+ * by reference to ShardServices methods implemented with
+ * schedule_for_singleton or the QUEUE_FOR_OSD_SINGLETON_* macros,
+ * and finally call run_orderer to submit the batch.
+ */
+ struct singleton_orderer_t {
+ using remote_func_t = std::function<seastar::future<>(OSDSingletonState&)>;
+ std::vector<remote_func_t> queue;
+
+ singleton_orderer_t() = default;
+ singleton_orderer_t(singleton_orderer_t &&) = default;
+ singleton_orderer_t &operator=(singleton_orderer_t &&) = default;
+
+ singleton_orderer_t(const singleton_orderer_t &) = delete;
+ singleton_orderer_t &operator=(const singleton_orderer_t &) = delete;
+ };
+
+ seastar::future<> run_orderer(singleton_orderer_t &&orderer) {
+ return with_singleton([](auto &singleton, auto &&orderer) {
+ return seastar::do_with(
+ std::move(orderer),
+ [&singleton](auto &orderer) {
+ return seastar::do_for_each(
+ orderer.queue,
+ [&singleton](auto &func) {
+ return std::invoke(func, singleton);
+ });
+ });
+ }, std::move(orderer));
+ }
+
+private:
+ template <typename F, typename... Args>
+ void schedule_for_singleton(
+ singleton_orderer_t &orderer, F &&f, Args&&... args) {
+ orderer.queue.push_back(
+ [f=std::forward<F>(f),
+ args=std::make_tuple(
+ std::forward<Args>(args)...)](OSDSingletonState &state) -> seastar::future<> {
+ return seastar::futurize_apply<>(
+ std::move(f),
+ std::tuple_cat(std::make_tuple(std::ref(state)), std::move(args)));
+ });
+ }
+
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
auto FROM_METHOD(Args&&... args) const { \
#define FORWARD_TO_OSD_SINGLETON(METHOD) \
FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
+#define QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, TARGET) \
+ template <typename... Args> \
+ auto METHOD(singleton_orderer_t &orderer, Args&&... args) { \
+ return schedule_for_singleton( \
+ orderer, \
+ [](auto &local_state, auto&&... args) { \
+ return local_state.TARGET( \
+ std::forward<decltype(args)>(args)...); \
+ }, std::forward<Args>(args)...); \
+ }
+#define QUEUE_FOR_OSD_SINGLETON(METHOD) \
+ QUEUE_FOR_OSD_SINGLETON_TARGET(METHOD, METHOD)
+
public:
template <typename... PSSArgs>
ShardServices(
FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd)
FORWARD_TO_OSD_SINGLETON(osdmap_subscribe)
- FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp)
- FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp)
+ QUEUE_FOR_OSD_SINGLETON(queue_want_pg_temp)
+ QUEUE_FOR_OSD_SINGLETON(remove_want_pg_temp)
FORWARD_TO_OSD_SINGLETON(requeue_pg_temp)
- FORWARD_TO_OSD_SINGLETON(send_pg_created)
- FORWARD_TO_OSD_SINGLETON(send_alive)
- FORWARD_TO_OSD_SINGLETON(send_pg_temp)
+ QUEUE_FOR_OSD_SINGLETON(send_pg_created)
+ QUEUE_FOR_OSD_SINGLETON(send_alive)
+ QUEUE_FOR_OSD_SINGLETON(send_pg_temp)
FORWARD_TO_LOCAL_CONST(get_mnow)
FORWARD_TO_LOCAL(get_hb_stamps)
FORWARD_TO_LOCAL(update_shard_superblock)
snap_dump_reservations,
snap_reserver.dump)
- auto local_update_priority(spg_t pgid, unsigned newprio) {
+
+ auto local_update_priority(
+ singleton_orderer_t &orderer,
+ spg_t pgid, unsigned newprio) {
LOG_PREFIX(ShardServices::local_update_priority);
SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio);
- return with_singleton([FNAME, pgid, newprio](auto &singleton) {
- SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
- return singleton.local_reserver.update_priority(pgid, newprio);
- });
+ return schedule_for_singleton(
+ orderer,
+ [FNAME, pgid, newprio](auto &singleton) {
+ SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
+ return singleton.local_reserver.update_priority(pgid, newprio);
+ });
}
- auto local_cancel_reservation(spg_t pgid) {
+ auto local_cancel_reservation(
+ singleton_orderer_t &orderer,
+ spg_t pgid) {
LOG_PREFIX(ShardServices::local_cancel_reservation);
SUBDEBUG(osd, "sending to singleton pgid {}", pgid);
- return with_singleton([FNAME, pgid](auto &singleton) {
- SUBDEBUG(osd, "on singleton pgid {}", pgid);
- return singleton.local_reserver.cancel_reservation(pgid);
- });
+ return schedule_for_singleton(
+ orderer,
+ [FNAME, pgid](auto &singleton) {
+ SUBDEBUG(osd, "on singleton pgid {}", pgid);
+ return singleton.local_reserver.cancel_reservation(pgid);
+ });
}
FORWARD_TO_OSD_SINGLETON_TARGET(
local_dump_reservations,
local_reserver.dump)
- auto remote_update_priority(spg_t pgid, unsigned newprio) {
+ auto remote_update_priority(
+ singleton_orderer_t &orderer,
+ spg_t pgid, unsigned newprio) {
LOG_PREFIX(ShardServices::remote_update_priority);
SUBDEBUG(osd, "sending to singleton pgid {} newprio {}", pgid, newprio);
- return with_singleton([FNAME, pgid, newprio](auto &singleton) {
- SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
- return singleton.remote_reserver.update_priority(pgid, newprio);
- });
+ return schedule_for_singleton(
+ orderer,
+ [FNAME, pgid, newprio](auto &singleton) {
+ SUBDEBUG(osd, "on singleton pgid {} newprio {}", pgid, newprio);
+ return singleton.remote_reserver.update_priority(pgid, newprio);
+ });
}
- auto remote_cancel_reservation(spg_t pgid) {
+ auto remote_cancel_reservation(
+ singleton_orderer_t &orderer,
+ spg_t pgid) {
LOG_PREFIX(ShardServices::remote_cancel_reservation);
SUBDEBUG(osd, "sending to singleton pgid {}", pgid);
- return with_singleton([FNAME, pgid](auto &singleton) {
- SUBDEBUG(osd, "on singleton pgid {}", pgid);
- return singleton.remote_reserver.cancel_reservation(pgid);
- });
+ return schedule_for_singleton(
+ orderer,
+ [FNAME, pgid](auto &singleton) {
+ SUBDEBUG(osd, "on singleton pgid {}", pgid);
+ return singleton.remote_reserver.cancel_reservation(pgid);
+ });
}
FORWARD_TO_OSD_SINGLETON_TARGET(
remote_dump_reservations,
});
});
}
- seastar::future<> local_request_reservation(
+ void local_request_reservation(
+ singleton_orderer_t &orderer,
spg_t item,
Context *on_reserved,
unsigned prio,
Context *on_preempt) {
LOG_PREFIX(ShardServices::local_request_reservation);
SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio);
- return with_singleton(
+ return schedule_for_singleton(
+ orderer,
[FNAME, item, prio](
OSDSingletonState &singleton,
Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
invoke_context_on_core(seastar::this_shard_id(), on_reserved),
invoke_context_on_core(seastar::this_shard_id(), on_preempt));
}
- seastar::future<> remote_request_reservation(
+ void remote_request_reservation(
+ singleton_orderer_t &orderer,
spg_t item,
Context *on_reserved,
unsigned prio,
Context *on_preempt) {
LOG_PREFIX(ShardServices::remote_request_reservation);
SUBDEBUG(osd, "sending to singleton pgid {} prio {}", item, prio);
- return with_singleton(
+ return schedule_for_singleton(
+ orderer,
[FNAME, item, prio](
OSDSingletonState &singleton,
Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
invoke_context_on_core(seastar::this_shard_id(), on_reserved),
invoke_context_on_core(seastar::this_shard_id(), on_preempt));
}
- seastar::future<> snap_request_reservation(
+ void snap_request_reservation(
+ singleton_orderer_t &orderer,
spg_t item,
Context *on_reserved,
unsigned prio) {
- return with_singleton(
+ return schedule_for_singleton(
+ orderer,
[item, prio](OSDSingletonState &singleton,
Context *wrapped_on_reserved) {
return singleton.snap_reserver.request_reservation(