namespace crimson::osd {
-void ClientRequest::Orderer::requeue(
- ShardServices &shard_services, Ref<PG> pg)
+void ClientRequest::Orderer::requeue(Ref<PG> pg)
{
LOG_PREFIX(ClientRequest::Orderer::requeue);
for (auto &req: list) {
DEBUGDPP("requeueing {}", *pg, req);
req.reset_instance_handle();
- std::ignore = req.with_pg_int(shard_services, pg);
+ std::ignore = req.with_pg_int(pg);
}
}
}
ClientRequest::ClientRequest(
- ShardServices &shard_services, crimson::net::ConnectionRef conn,
+ ShardServices &_shard_services, crimson::net::ConnectionRef conn,
Ref<MOSDOp> &&m)
- : put_historic_shard_services(&shard_services),
+ : shard_services(&_shard_services),
conn(std::move(conn)),
m(std::move(m)),
instance_handle(new instance_handle_t)
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
-seastar::future<> ClientRequest::with_pg_int(
- ShardServices &shard_services, Ref<PG> pgref)
+seastar::future<> ClientRequest::with_pg_int(Ref<PG> pgref)
{
+ ceph_assert_always(shard_services);
+
LOG_PREFIX(ClientRequest::with_pg_int);
epoch_t same_interval_since = pgref->get_interval_start_epoch();
DEBUGDPP("{}: same_interval_since: {}", *pgref, *this, same_interval_since);
auto instance_handle = get_instance_handle();
auto &ihref = *instance_handle;
return interruptor::with_interruption(
- [FNAME, this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
+ [FNAME, this, pgref, this_instance_id, &ihref]() mutable {
DEBUGDPP("{} start", *pgref, *this);
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
- return shard_services.send_incremental_map(
+ return shard_services->send_incremental_map(
std::ref(*conn), m->get_map_epoch()
).then([FNAME, this, this_instance_id, pgref] {
DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
}
seastar::future<> ClientRequest::with_pg(
- ShardServices &shard_services, Ref<PG> pgref)
+ ShardServices &_shard_services, Ref<PG> pgref)
{
- put_historic_shard_services = &shard_services;
+ shard_services = &_shard_services;
pgref->client_request_orderer.add_request(*this);
auto ret = on_complete.get_future();
- std::ignore = with_pg_int(
- shard_services, std::move(pgref)
- );
+ std::ignore = with_pg_int(std::move(pgref));
return ret;
}
void ClientRequest::put_historic() const
{
- ceph_assert_always(put_historic_shard_services);
- put_historic_shard_services->get_registry().put_historic(*this);
+ ceph_assert_always(shard_services);
+ shard_services->get_registry().put_historic(*this);
}
const SnapContext ClientRequest::get_snapc(
class ClientRequest final : public PhasedOperationT<ClientRequest>,
private CommonClientRequest {
- // Initially set to primary core, updated to pg core after move,
- // used by put_historic
- ShardServices *put_historic_shard_services = nullptr;
+ // Initially set to primary core, updated to pg core after with_pg()
+ ShardServices *shard_services = nullptr;
crimson::net::ConnectionRef conn;
// must be after conn due to ConnectionPipeline's life-time
list.erase(list_t::s_iterator_to(request));
intrusive_ptr_release(&request);
}
- void requeue(ShardServices &shard_services, Ref<PG> pg);
+ void requeue(Ref<PG> pg);
void clear_and_cancel(PG &pg);
};
void complete_request();
conn = make_local_shared_foreign(std::move(_conn));
}
- seastar::future<> with_pg_int(
- ShardServices &shard_services, Ref<PG> pg);
+ seastar::future<> with_pg_int(Ref<PG> pg);
public:
seastar::future<> with_pg(