return shard_services.get_client_request_pipeline();
}
-ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
+CommonPGPipeline &ClientRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
DEBUGDPP("{} start", *pgref, *this);
PG &pg = *pgref;
+
+ DEBUGDPP("{}.{}: entering wait_pg_ready stage",
+ *pgref, *this, this_instance_id);
+ // The prior stage is OrderedExclusive (PerShardPipeline::create_or_wait_pg)
+ // and wait_pg_ready is OrderedConcurrent. This transition, therefore, cannot
+ // block and using enter_stage_sync is legal and more efficient than
+ // enter_stage.
+ ihref.enter_stage_sync(client_pp(pg).wait_pg_ready, *this);
+
if (!m->get_hobj().get_key().empty()) {
// There are no users of locator. It was used to ensure that multipart-upload
// parts would end up in the same PG so that they could be clone_range'd into
DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
co_return;
}
- DEBUGDPP("{}.{}: entering await_map stage",
- *pgref, *this, this_instance_id);
- co_await ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this);
- DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
- pg, *this, this_instance_id);
+
auto map_epoch = co_await interruptor::make_interruptible(
ihref.enter_blocker(
*this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
m->get_min_epoch(), nullptr));
- DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+ DEBUGDPP("{}.{}: waited for epoch {}, waiting for active",
pg, *this, this_instance_id, map_epoch);
- co_await ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
-
- DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
- pg, *this, this_instance_id);
co_await interruptor::make_interruptible(
ihref.enter_blocker(
*this,
pg.wait_for_active_blocker,
&decltype(pg.wait_for_active_blocker)::wait));
+ co_await ihref.enter_stage<interruptor>(client_pp(pg).get_obc, *this);
+
if (int res = op_info.set_from_op(&*m, *pg.get_osdmap());
res != 0) {
co_await reply_op_error(pgref, res);
instance_handle_t &ihref, Ref<PG> pg, unsigned this_instance_id)
{
LOG_PREFIX(ClientRequest::process_op);
- ihref.enter_stage_sync(client_pp(*pg).recover_missing, *this);
+ ihref.obc_orderer = pg->obc_loader.get_obc_orderer(m->get_hobj());
+ auto obc_manager = pg->obc_loader.get_obc_manager(
+ *(ihref.obc_orderer),
+ m->get_hobj());
+ co_await ihref.enter_stage<interruptor>(
+ ihref.obc_orderer->obc_pp().process, *this);
+
if (!pg->is_primary()) {
DEBUGDPP(
"Skipping recover_missings on non primary pg for soid {}",
}
}
- /**
- * The previous stage of recover_missing is a concurrent phase.
- * Checking for already_complete requests must done exclusively.
- * Since get_obc is also an exclusive stage, we can merge both stages into
- * a single stage and avoid stage switching overhead.
- */
- DEBUGDPP("{}.{}: entering check_already_complete_get_obc",
- *pg, *this, this_instance_id);
- co_await ihref.enter_stage<interruptor>(
- client_pp(*pg).check_already_complete_get_obc, *this);
DEBUGDPP("{}.{}: checking already_complete",
*pg, *this, this_instance_id);
auto completed = co_await pg->already_complete(m->get_reqid());
DEBUGDPP("{}.{}: past scrub blocker, getting obc",
*pg, *this, this_instance_id);
- auto obc_manager = pg->obc_loader.get_obc_manager(m->get_hobj());
-
- // initiate load_and_lock in order, but wait concurrently
- ihref.enter_stage_sync(
- client_pp(*pg).lock_obc, *this);
-
int load_err = co_await pg->obc_loader.load_and_lock(
obc_manager, pg->get_lock_type(op_info)
).si_then([]() -> int {
co_return;
}
- DEBUGDPP("{}.{}: got obc {}, entering process stage",
+ DEBUGDPP("{}.{}: obc {} loaded and locked, calling do_process",
*pg, *this, this_instance_id, obc_manager.get_obc()->obs);
- co_await ihref.enter_stage<interruptor>(
- client_pp(*pg).process, *this);
-
- DEBUGDPP("{}.{}: in process stage, calling do_process",
- *pg, *this, this_instance_id);
co_await do_process(
ihref, pg, obc_manager.get_obc(), this_instance_id
);
std::move(ox), m->ops);
co_await std::move(submitted);
}
- co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+ co_await ihref.enter_stage<interruptor>(
+ ihref.obc_orderer->obc_pp().wait_repop, *this);
co_await std::move(all_completed);
}
- co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
+ co_await ihref.enter_stage<interruptor>(
+ ihref.obc_orderer->obc_pp().send_reply, *this);
if (ret) {
int err = -ret->value();
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
+#include "crimson/osd/object_context_loader.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request_common.h"
// don't leave any references on the source core, so we just bypass it by using
// intrusive_ptr instead.
using ref_t = boost::intrusive_ptr<instance_handle_t>;
+ std::optional<ObjectContextLoader::Orderer> obc_orderer;
PipelineHandle handle;
std::tuple<
- PGPipeline::AwaitMap::BlockingEvent,
+ CommonPGPipeline::WaitPGReady::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
- PGPipeline::WaitForActive::BlockingEvent,
PGActivationBlocker::BlockingEvent,
- PGPipeline::RecoverMissing::BlockingEvent,
+ CommonPGPipeline::GetOBC::BlockingEvent,
+ CommonOBCPipeline::Process::BlockingEvent,
scrub::PGScrubber::BlockingEvent,
- PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
- PGPipeline::LockOBC::BlockingEvent,
- PGPipeline::Process::BlockingEvent,
- PGPipeline::WaitRepop::BlockingEvent,
- PGPipeline::SendReply::BlockingEvent,
+ CommonOBCPipeline::WaitRepop::BlockingEvent,
+ CommonOBCPipeline::SendReply::BlockingEvent,
CompletionEvent
> pg_tracking_events;
unsigned this_instance_id);
bool is_pg_op() const;
- PGPipeline &client_pp(PG &pg);
+ CommonPGPipeline &client_pp(PG &pg);
template <typename Errorator>
using interruptible_errorator =
LOG_PREFIX(InternalClientRequest::with_interruption);
assert(pg->is_active());
- co_await enter_stage<interruptor>(client_pp().recover_missing);
+ obc_orderer = pg->obc_loader.get_obc_orderer(get_target_oid());
+ auto obc_manager = pg->obc_loader.get_obc_manager(
+ *obc_orderer,
+ get_target_oid());
+
+ co_await enter_stage<interruptor>(obc_orderer->obc_pp().process);
bool unfound = co_await do_recover_missing(
pg, get_target_oid(), osd_reqid_t());
std::make_error_code(std::errc::operation_canceled),
fmt::format("{} is unfound, drop it!", get_target_oid()));
}
- co_await enter_stage<interruptor>(
- client_pp().check_already_complete_get_obc);
- DEBUGI("{}: getting obc lock", *this);
+ DEBUGI("{}: generating ops", *this);
auto osd_ops = create_osd_ops();
std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
assert(ret == 0);
- auto obc_manager = pg->obc_loader.get_obc_manager(get_target_oid());
-
- // initiate load_and_lock in order, but wait concurrently
- enter_stage_sync(client_pp().lock_obc);
-
co_await pg->obc_loader.load_and_lock(
obc_manager, pg->get_lock_type(op_info)
).handle_error_interruptible(
crimson::ct_error::assert_all("unexpected error")
);
- DEBUGDPP("{}: got obc {}, entering process stage",
- *pg, *this, obc_manager.get_obc()->obs);
- co_await enter_stage<interruptor>(client_pp().process);
-
auto params = get_do_osd_ops_params();
OpsExecuter ox(
pg, obc_manager.get_obc(), op_info, params, params.get_connection(),
std::move(ox), osd_ops);
co_await std::move(submitted);
+
+ co_await enter_stage<interruptor>(obc_orderer->obc_pp().wait_repop);
+
co_await std::move(completed);
DEBUGDPP("{}: complete", *pg, *this);
#pragma once
#include "crimson/common/type_helpers.h"
+#include "crimson/osd/object_context_loader.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request_common.h"
#include "crimson/osd/pg.h"
Ref<PG> pg;
epoch_t start_epoch;
OpInfo op_info;
+ std::optional<ObjectContextLoader::Orderer> obc_orderer;
PipelineHandle handle;
public:
std::tuple<
StartEvent,
- CommonPGPipeline::WaitForActive::BlockingEvent,
- PGActivationBlocker::BlockingEvent,
- CommonPGPipeline::RecoverMissing::BlockingEvent,
- CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
- CommonPGPipeline::LockOBC::BlockingEvent,
- CommonPGPipeline::Process::BlockingEvent,
+ CommonOBCPipeline::Process::BlockingEvent,
+ CommonOBCPipeline::WaitRepop::BlockingEvent,
CompletionEvent
> tracking_events;
};
SnapTrimObjSubEvent::snap_trim_obj_subevent_ret_t
SnapTrimObjSubEvent::start()
{
+ obc_orderer = pg->obc_loader.get_obc_orderer(
+ coid);
+
ceph_assert(pg->is_active_clean());
- auto exit_handle = seastar::defer([this] {
- logger().debug("{}: exit", *this);
- handle.exit();
+ auto exit_handle = seastar::defer([this, opref = IRef(this)] {
+ logger().debug("{}: exit", *opref);
+ std::ignore = handle.complete().then([opref = std::move(opref)] {});
});
co_await enter_stage<interruptor>(
- client_pp().check_already_complete_get_obc);
+ obc_orderer->obc_pp().process);
logger().debug("{}: getting obc for {}", *this, coid);
auto obc_manager = pg->obc_loader.get_obc_manager(
+ *obc_orderer,
coid, false /* resolve_oid */);
co_await pg->obc_loader.load_and_lock(
logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid());
- co_await enter_stage<interruptor>(client_pp().process);
auto all_completed = interruptor::now();
{
// as with PG::submit_executer, we need to build the pg log entries
co_await std::move(submitted);
}
- co_await enter_stage<interruptor>(client_pp().wait_repop);
+ co_await enter_stage<interruptor>(obc_orderer->obc_pp().wait_repop);
co_await std::move(all_completed);
logger().debug("{}: completed", *this);
- co_await interruptor::make_interruptible(handle.complete());
}
void SnapTrimObjSubEvent::print(std::ostream &lhs) const
#include <iostream>
#include <seastar/core/future.hh>
+#include "crimson/osd/object_context_loader.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/common/subop_blocker.h"
}
Ref<PG> pg;
+ std::optional<ObjectContextLoader::Orderer> obc_orderer;
PipelineHandle handle;
osd_op_params_t osd_op_p;
const hobject_t coid;
std::tuple<
StartEvent,
- CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
- CommonPGPipeline::Process::BlockingEvent,
- CommonPGPipeline::WaitRepop::BlockingEvent,
+ CommonOBCPipeline::Process::BlockingEvent,
+ CommonOBCPipeline::WaitRepop::BlockingEvent,
CompletionEvent
> tracking_events;
};