});
}
-seastar::future<> ClientRequest::process_op(
- Ref<PG> &pgref)
+seastar::future<> ClientRequest::process_op(Ref<PG> &pg)
{
- PG& pg = *pgref;
- return with_blocking_future(
- handle.enter(pp(pg).recover_missing)
- ).then([this, &pg, pgref] {
- eversion_t ver;
- const hobject_t& soid = m->get_hobj();
- logger().debug("{} check for recovery, {}", *this, soid);
- if (pg.is_unreadable_object(soid, &ver) ||
- pg.is_degraded_or_backfilling_object(soid)) {
- logger().debug("{} need to wait for recovery, {}", *this, soid);
- if (pg.get_recovery_backend()->is_recovering(soid)) {
- return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered();
- } else {
- auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
- soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch());
- return std::move(fut);
- }
- }
- return seastar::now();
- }).then([this, &pg] {
- return with_blocking_future(handle.enter(pp(pg).get_obc));
- }).then([this, &pg, &pgref]() -> PG::load_obc_ertr::future<> {
- op_info.set_from_op(&*m, *pg.get_osdmap());
- return pg.with_locked_obc(m, op_info, this, [this, &pg, &pgref](auto obc) {
- return with_blocking_future(
- handle.enter(pp(pg).process)
- ).then([this, &pg, obc]()
- -> crimson::errorator<crimson::ct_error::eagain>::future<Ref<MOSDOpReply>> {
- if (!pg.is_primary()) {
- // primary can handle both normal ops and balanced reads
- if (is_misdirected(pg)) {
- logger().trace("process_op: dropping misdirected op");
- return seastar::make_ready_future<Ref<MOSDOpReply>>();
- } else if (const hobject_t& hoid = m->get_hobj();
- !pg.get_peering_state().can_serve_replica_read(hoid)) {
- auto reply = make_message<MOSDOpReply>(
- m.get(), -EAGAIN, pg.get_osdmap_epoch(),
- m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
- !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }
- }
- return pg.do_osd_ops(m, obc, op_info);
- }).safe_then([this](Ref<MOSDOpReply> reply) {
- if (reply) {
- return conn->send(std::move(reply));
- } else {
- return seastar::now();
- }
- }, crimson::ct_error::eagain::handle([this, &pgref] {
- return process_op(pgref);
- }));
+ return with_blocking_future(handle.enter(pp(*pg).recover_missing)).then([&] {
+ return do_recover_missing(pg);
+ }).then([&] {
+ return with_blocking_future(handle.enter(pp(*pg).get_obc));
+ }).then([this, &pg]() -> PG::load_obc_ertr::future<> {
+ op_info.set_from_op(&*m, *pg->get_osdmap());
+ return pg->with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
+ return with_blocking_future(handle.enter(pp(*pg).process)).then(
+ [this, &pg, obc] {
+ return do_process(pg, obc);
+ });
});
- }).safe_then([pgref=std::move(pgref)] {
+ }).safe_then([pg=std::move(pg)] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
}));
}
+seastar::future<> ClientRequest::do_recover_missing(Ref<PG>& pg)
+{
+ eversion_t ver;
+ const hobject_t& soid = m->get_hobj();
+ logger().debug("{} check for recovery, {}", *this, soid);
+ if (!pg->is_unreadable_object(soid, &ver) &&
+ !pg->is_degraded_or_backfilling_object(soid)) {
+ return seastar::now();
+ }
+ logger().debug("{} need to wait for recovery, {}", *this, soid);
+ if (pg->get_recovery_backend()->is_recovering(soid)) {
+ return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered();
+ } else {
+ auto [op, fut] =
+ osd.get_shard_services().start_operation<UrgentRecovery>(
+ soid, ver, pg, osd.get_shard_services(), pg->get_osdmap_epoch());
+ return std::move(fut);
+ }
+}
+
+seastar::future<>
+ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
+{
+ using do_ops_return_t =
+ crimson::errorator<crimson::ct_error::eagain>::future<Ref<MOSDOpReply>>;
+ return [&pg, obc]() -> do_ops_return_t {
+ if (!pg->is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(*pg)) {
+ logger().trace("process_op: dropping misdirected op");
+ return seastar::make_ready_future<Ref<MOSDOpReply>>();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg->get_peering_state().can_serve_replica_read(hoid)) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -EAGAIN, pg->get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }
+ }
+ return pg->do_osd_ops(m, obc, op_info);
+ }().safe_then([this](Ref<MOSDOpReply> reply) {
+ if (reply) {
+ return conn->send(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ }, crimson::ct_error::eagain::handle([this, &pg] {
+ return process_op(pg);
+ }));
+}
+
bool ClientRequest::is_misdirected(const PG& pg) const
{
// otherwise take a closer look